|
|
|
|
|
""" |
|
|
C2Sentinel Advanced Usage Example |
|
|
|
|
|
Demonstrates context enrichment, whitelist/blacklist management, |
|
|
batch analysis, log parsing, and reconnaissance features. |
|
|
""" |
|
|
|
|
|
from c2sentinel import C2Sentinel, ConnectionContext |
|
|
|
|
|
def main(): |
|
|
|
|
|
sentinel = C2Sentinel.load('c2_sentinel') |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print("=" * 60) |
|
|
print("Context Enrichment") |
|
|
print("=" * 60) |
|
|
|
|
|
|
|
|
connections = [] |
|
|
timestamp = 1705600000 |
|
|
|
|
|
for i in range(10): |
|
|
connections.append({ |
|
|
'timestamp': timestamp + (i * 60), |
|
|
'dst_ip': '10.0.0.50', |
|
|
'dst_port': 443, |
|
|
'bytes_sent': 200, |
|
|
'bytes_recv': 500, |
|
|
}) |
|
|
|
|
|
|
|
|
result_no_ctx = sentinel.analyze(connections) |
|
|
print(f"Without context: is_c2={result_no_ctx.is_c2}, prob={result_no_ctx.c2_probability:.2f}") |
|
|
|
|
|
|
|
|
context = ConnectionContext( |
|
|
process_name='prometheus', |
|
|
known_good=True, |
|
|
ip_reputation=0.95, |
|
|
dns_queries=['metrics.internal.company.com'] |
|
|
) |
|
|
|
|
|
result_with_ctx = sentinel.analyze(connections, context=context) |
|
|
print(f"With context: is_c2={result_with_ctx.is_c2}, prob={result_with_ctx.c2_probability:.2f}") |
|
|
print(f"Context applied: {result_with_ctx.context_applied}") |
|
|
print() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print("=" * 60) |
|
|
print("Whitelist and Blacklist") |
|
|
print("=" * 60) |
|
|
|
|
|
|
|
|
sentinel.add_whitelist( |
|
|
ips=['8.8.8.8', '1.1.1.1'], |
|
|
domains=['google.com', 'cloudflare.com'] |
|
|
) |
|
|
|
|
|
|
|
|
sentinel.add_blacklist( |
|
|
ips=['10.10.10.10'], |
|
|
domains=['malware.example.com'] |
|
|
) |
|
|
|
|
|
|
|
|
dns_connections = [] |
|
|
for i in range(10): |
|
|
dns_connections.append({ |
|
|
'timestamp': timestamp + (i * 5), |
|
|
'dst_ip': '8.8.8.8', |
|
|
'dst_port': 53, |
|
|
'bytes_sent': 50, |
|
|
'bytes_recv': 200, |
|
|
}) |
|
|
|
|
|
result = sentinel.analyze(dns_connections) |
|
|
print(f"Whitelisted DNS (8.8.8.8): is_c2={result.is_c2}") |
|
|
|
|
|
|
|
|
blacklist_connections = [] |
|
|
for i in range(10): |
|
|
blacklist_connections.append({ |
|
|
'timestamp': timestamp + (i * 60), |
|
|
'dst_ip': '10.10.10.10', |
|
|
'dst_port': 443, |
|
|
'bytes_sent': 200, |
|
|
'bytes_recv': 500, |
|
|
}) |
|
|
|
|
|
result = sentinel.analyze(blacklist_connections) |
|
|
print(f"Blacklisted IP (10.10.10.10): is_c2={result.is_c2}, prob={result.c2_probability:.2f}") |
|
|
print() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print("=" * 60) |
|
|
print("Batch Analysis") |
|
|
print("=" * 60) |
|
|
|
|
|
|
|
|
connection_groups = [] |
|
|
|
|
|
|
|
|
web_group = [] |
|
|
for i, dest in enumerate(['93.184.216.34', '151.101.1.140', '172.217.14.206']): |
|
|
for j in range(3): |
|
|
web_group.append({ |
|
|
'timestamp': timestamp + (i * 10) + j, |
|
|
'dst_ip': dest, |
|
|
'dst_port': 443, |
|
|
'bytes_sent': 100 + (j * 50), |
|
|
'bytes_recv': 5000 + (j * 1000), |
|
|
}) |
|
|
connection_groups.append(web_group) |
|
|
|
|
|
|
|
|
beacon_group = [] |
|
|
for i in range(10): |
|
|
beacon_group.append({ |
|
|
'timestamp': timestamp + (i * 60), |
|
|
'dst_ip': '45.33.32.156', |
|
|
'dst_port': 8080, |
|
|
'bytes_sent': 200, |
|
|
'bytes_recv': 500, |
|
|
}) |
|
|
connection_groups.append(beacon_group) |
|
|
|
|
|
|
|
|
db_group = [] |
|
|
for i in range(15): |
|
|
db_group.append({ |
|
|
'timestamp': timestamp + (i * 0.5), |
|
|
'dst_ip': '10.0.1.100', |
|
|
'dst_port': 5432, |
|
|
'bytes_sent': 100 + (i * 10), |
|
|
'bytes_recv': 2000 + (i * 500), |
|
|
}) |
|
|
connection_groups.append(db_group) |
|
|
|
|
|
|
|
|
results = sentinel.analyze_batch(connection_groups) |
|
|
|
|
|
for i, result in enumerate(results): |
|
|
print(f"Group {i+1}: is_c2={result.is_c2}, prob={result.c2_probability:.2f}, " |
|
|
f"pattern={result.matched_legitimate_pattern or 'None'}") |
|
|
print() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print("=" * 60) |
|
|
print("Reconnaissance Features") |
|
|
print("=" * 60) |
|
|
|
|
|
|
|
|
print("\nIP Analysis:") |
|
|
ip_info = sentinel.recon.analyze_ip('104.16.132.229') |
|
|
print(f" IP: 104.16.132.229") |
|
|
print(f" Valid: {ip_info['is_valid']}") |
|
|
print(f" Private: {ip_info['is_private']}") |
|
|
print(f" CDN: {ip_info['is_cdn']}") |
|
|
if ip_info['cdn_provider']: |
|
|
print(f" CDN Provider: {ip_info['cdn_provider']}") |
|
|
|
|
|
|
|
|
print("\nConnection Pattern Analysis:") |
|
|
patterns = sentinel.recon.analyze_connection_patterns(beacon_group) |
|
|
print(f" Mean Interval: {patterns['timing']['mean_interval']:.2f}s") |
|
|
print(f" Interval CV: {patterns['timing']['interval_cv']:.4f}") |
|
|
print(f" Mean Bytes Sent: {patterns['volume']['mean_bytes_sent']:.0f}") |
|
|
print(f" Single Destination: {patterns['behavioral']['single_destination']}") |
|
|
|
|
|
|
|
|
print("\nIOC Generation:") |
|
|
beacon_result = sentinel.analyze(beacon_group) |
|
|
if beacon_result.is_c2: |
|
|
iocs = sentinel.recon.generate_iocs(beacon_group, beacon_result.to_dict()) |
|
|
print(f" IPs: {iocs['ips']}") |
|
|
print(f" Ports: {iocs['ports']}") |
|
|
print(f" Timing Signature: {iocs['timing_signatures']}") |
|
|
print() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print("=" * 60) |
|
|
print("Log File Parsing") |
|
|
print("=" * 60) |
|
|
|
|
|
|
|
|
json_logs = [ |
|
|
'{"timestamp": 1705600000, "dst_ip": "10.0.0.1", "dst_port": 443, "bytes_sent": 200, "bytes_recv": 500}', |
|
|
'{"timestamp": 1705600060, "dst_ip": "10.0.0.1", "dst_port": 443, "bytes_sent": 200, "bytes_recv": 500}', |
|
|
'{"timestamp": 1705600120, "dst_ip": "10.0.0.1", "dst_port": 443, "bytes_sent": 200, "bytes_recv": 500}', |
|
|
'{"timestamp": 1705600180, "dst_ip": "10.0.0.1", "dst_port": 443, "bytes_sent": 200, "bytes_recv": 500}', |
|
|
'{"timestamp": 1705600240, "dst_ip": "10.0.0.1", "dst_port": 443, "bytes_sent": 200, "bytes_recv": 500}', |
|
|
] |
|
|
|
|
|
results = sentinel.analyze_logs(json_logs, group_by_dst=True) |
|
|
print(f"Analyzed {len(json_logs)} log lines") |
|
|
for dst, result in results.items(): |
|
|
print(f" {dst}: is_c2={result.is_c2}, prob={result.c2_probability:.2f}") |
|
|
print() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print("=" * 60) |
|
|
print("Full Result Object") |
|
|
print("=" * 60) |
|
|
|
|
|
result = sentinel.analyze(beacon_group) |
|
|
result_dict = result.to_dict() |
|
|
|
|
|
for key, value in result_dict.items(): |
|
|
if isinstance(value, list) and len(value) > 3: |
|
|
print(f" {key}: [{value[0]}, {value[1]}, ... ({len(value)} items)]") |
|
|
else: |
|
|
print(f" {key}: {value}") |
|
|
|
|
|
|
|
|
if __name__ == '__main__': |
|
|
main() |
|
|
|