#!/usr/bin/env python3 """ C2Sentinel Advanced Usage Example Demonstrates context enrichment, whitelist/blacklist management, batch analysis, log parsing, and reconnaissance features. """ from c2sentinel import C2Sentinel, ConnectionContext def main(): # Load the model sentinel = C2Sentinel.load('c2_sentinel') # ========================================================================= # Context Enrichment # ========================================================================= print("=" * 60) print("Context Enrichment") print("=" * 60) # Create connections that might look suspicious connections = [] timestamp = 1705600000 for i in range(10): connections.append({ 'timestamp': timestamp + (i * 60), 'dst_ip': '10.0.0.50', 'dst_port': 443, 'bytes_sent': 200, 'bytes_recv': 500, }) # Analyze without context result_no_ctx = sentinel.analyze(connections) print(f"Without context: is_c2={result_no_ctx.is_c2}, prob={result_no_ctx.c2_probability:.2f}") # Analyze with context indicating this is a known monitoring agent context = ConnectionContext( process_name='prometheus', known_good=True, ip_reputation=0.95, dns_queries=['metrics.internal.company.com'] ) result_with_ctx = sentinel.analyze(connections, context=context) print(f"With context: is_c2={result_with_ctx.is_c2}, prob={result_with_ctx.c2_probability:.2f}") print(f"Context applied: {result_with_ctx.context_applied}") print() # ========================================================================= # Whitelist and Blacklist Management # ========================================================================= print("=" * 60) print("Whitelist and Blacklist") print("=" * 60) # Add trusted infrastructure to whitelist sentinel.add_whitelist( ips=['8.8.8.8', '1.1.1.1'], domains=['google.com', 'cloudflare.com'] ) # Add known malicious indicators to blacklist sentinel.add_blacklist( ips=['10.10.10.10'], domains=['malware.example.com'] ) # Test whitelisted IP dns_connections = [] for i in range(10): dns_connections.append({ 'timestamp': timestamp + (i * 5), 'dst_ip': '8.8.8.8', 'dst_port': 53, 'bytes_sent': 50, 'bytes_recv': 200, }) result = sentinel.analyze(dns_connections) print(f"Whitelisted DNS (8.8.8.8): is_c2={result.is_c2}") # Test blacklisted IP blacklist_connections = [] for i in range(10): blacklist_connections.append({ 'timestamp': timestamp + (i * 60), 'dst_ip': '10.10.10.10', 'dst_port': 443, 'bytes_sent': 200, 'bytes_recv': 500, }) result = sentinel.analyze(blacklist_connections) print(f"Blacklisted IP (10.10.10.10): is_c2={result.is_c2}, prob={result.c2_probability:.2f}") print() # ========================================================================= # Batch Analysis # ========================================================================= print("=" * 60) print("Batch Analysis") print("=" * 60) # Create multiple connection groups for batch processing connection_groups = [] # Group 1: Normal web browsing (variable sizes, multiple destinations) web_group = [] for i, dest in enumerate(['93.184.216.34', '151.101.1.140', '172.217.14.206']): for j in range(3): web_group.append({ 'timestamp': timestamp + (i * 10) + j, 'dst_ip': dest, 'dst_port': 443, 'bytes_sent': 100 + (j * 50), 'bytes_recv': 5000 + (j * 1000), }) connection_groups.append(web_group) # Group 2: Potential C2 beacon beacon_group = [] for i in range(10): beacon_group.append({ 'timestamp': timestamp + (i * 60), 'dst_ip': '45.33.32.156', 'dst_port': 8080, 'bytes_sent': 200, 'bytes_recv': 500, }) connection_groups.append(beacon_group) # Group 3: Database connection pool db_group = [] for i in range(15): db_group.append({ 'timestamp': timestamp + (i * 0.5), 'dst_ip': '10.0.1.100', 'dst_port': 5432, 'bytes_sent': 100 + (i * 10), 'bytes_recv': 2000 + (i * 500), }) connection_groups.append(db_group) # Analyze all groups at once results = sentinel.analyze_batch(connection_groups) for i, result in enumerate(results): print(f"Group {i+1}: is_c2={result.is_c2}, prob={result.c2_probability:.2f}, " f"pattern={result.matched_legitimate_pattern or 'None'}") print() # ========================================================================= # Reconnaissance Features # ========================================================================= print("=" * 60) print("Reconnaissance Features") print("=" * 60) # IP Analysis print("\nIP Analysis:") ip_info = sentinel.recon.analyze_ip('104.16.132.229') print(f" IP: 104.16.132.229") print(f" Valid: {ip_info['is_valid']}") print(f" Private: {ip_info['is_private']}") print(f" CDN: {ip_info['is_cdn']}") if ip_info['cdn_provider']: print(f" CDN Provider: {ip_info['cdn_provider']}") # Connection Pattern Analysis print("\nConnection Pattern Analysis:") patterns = sentinel.recon.analyze_connection_patterns(beacon_group) print(f" Mean Interval: {patterns['timing']['mean_interval']:.2f}s") print(f" Interval CV: {patterns['timing']['interval_cv']:.4f}") print(f" Mean Bytes Sent: {patterns['volume']['mean_bytes_sent']:.0f}") print(f" Single Destination: {patterns['behavioral']['single_destination']}") # IOC Generation (only if C2 detected) print("\nIOC Generation:") beacon_result = sentinel.analyze(beacon_group) if beacon_result.is_c2: iocs = sentinel.recon.generate_iocs(beacon_group, beacon_result.to_dict()) print(f" IPs: {iocs['ips']}") print(f" Ports: {iocs['ports']}") print(f" Timing Signature: {iocs['timing_signatures']}") print() # ========================================================================= # Log File Parsing # ========================================================================= print("=" * 60) print("Log File Parsing") print("=" * 60) # Example with JSON log format json_logs = [ '{"timestamp": 1705600000, "dst_ip": "10.0.0.1", "dst_port": 443, "bytes_sent": 200, "bytes_recv": 500}', '{"timestamp": 1705600060, "dst_ip": "10.0.0.1", "dst_port": 443, "bytes_sent": 200, "bytes_recv": 500}', '{"timestamp": 1705600120, "dst_ip": "10.0.0.1", "dst_port": 443, "bytes_sent": 200, "bytes_recv": 500}', '{"timestamp": 1705600180, "dst_ip": "10.0.0.1", "dst_port": 443, "bytes_sent": 200, "bytes_recv": 500}', '{"timestamp": 1705600240, "dst_ip": "10.0.0.1", "dst_port": 443, "bytes_sent": 200, "bytes_recv": 500}', ] results = sentinel.analyze_logs(json_logs, group_by_dst=True) print(f"Analyzed {len(json_logs)} log lines") for dst, result in results.items(): print(f" {dst}: is_c2={result.is_c2}, prob={result.c2_probability:.2f}") print() # ========================================================================= # Result Object Details # ========================================================================= print("=" * 60) print("Full Result Object") print("=" * 60) result = sentinel.analyze(beacon_group) result_dict = result.to_dict() for key, value in result_dict.items(): if isinstance(value, list) and len(value) > 3: print(f" {key}: [{value[0]}, {value[1]}, ... ({len(value)} items)]") else: print(f" {key}: {value}") if __name__ == '__main__': main()