|
|
|
|
|
""" |
|
|
C2Sentinel Basic Usage Example |
|
|
|
|
|
Demonstrates loading the model and analyzing network connections |
|
|
for C2 beacon detection. |
|
|
""" |
|
|
|
|
|
from c2sentinel import C2Sentinel |
|
|
|
|
|
def main(): |
|
|
|
|
|
sentinel = C2Sentinel.load('c2_sentinel') |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
connections = [] |
|
|
timestamp = 1705600000 |
|
|
|
|
|
for i in range(10): |
|
|
connections.append({ |
|
|
'timestamp': timestamp + (i * 60), |
|
|
'dst_ip': '10.0.0.100', |
|
|
'dst_port': 443, |
|
|
'bytes_sent': 200, |
|
|
'bytes_recv': 500, |
|
|
}) |
|
|
|
|
|
result = sentinel.analyze(connections) |
|
|
|
|
|
print("Example 1: Regular beacon pattern") |
|
|
print(f" Is C2: {result.is_c2}") |
|
|
print(f" Probability: {result.c2_probability:.2f}") |
|
|
print(f" C2 Type: {result.c2_type}") |
|
|
print(f" Detection Method: {result.detection_method}") |
|
|
print() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
ssh_connections = [] |
|
|
timestamp = 1705600000 |
|
|
|
|
|
for i in range(10): |
|
|
ssh_connections.append({ |
|
|
'timestamp': timestamp + (i * 30), |
|
|
'dst_ip': '192.168.1.50', |
|
|
'dst_port': 22, |
|
|
'bytes_sent': 48, |
|
|
'bytes_recv': 48, |
|
|
}) |
|
|
|
|
|
result = sentinel.analyze(ssh_connections) |
|
|
|
|
|
print("Example 2: SSH keepalive pattern") |
|
|
print(f" Is C2: {result.is_c2}") |
|
|
print(f" Matched Pattern: {result.matched_legitimate_pattern}") |
|
|
print(f" Service Type: {result.service_type}") |
|
|
print() |
|
|
|
|
|
|
|
|
|
|
|
c2_connections = [] |
|
|
timestamp = 1705600000 |
|
|
|
|
|
for i in range(10): |
|
|
c2_connections.append({ |
|
|
'timestamp': timestamp + (i * 30), |
|
|
'dst_ip': '45.33.32.156', |
|
|
'dst_port': 4444, |
|
|
'bytes_sent': 150, |
|
|
'bytes_recv': 300, |
|
|
}) |
|
|
|
|
|
result = sentinel.analyze(c2_connections) |
|
|
|
|
|
print("Example 3: High-confidence C2 port") |
|
|
print(f" Is C2: {result.is_c2}") |
|
|
print(f" C2 Type: {result.c2_type}") |
|
|
print(f" Probability: {result.c2_probability:.2f}") |
|
|
print(f" Immediate Detection: {result.immediate_detection}") |
|
|
print(f" Risk Factors: {result.risk_factors}") |
|
|
print() |
|
|
|
|
|
|
|
|
|
|
|
print("Example 4: Threshold adjustment") |
|
|
|
|
|
|
|
|
result_low = sentinel.analyze(connections, threshold=0.3) |
|
|
print(f" Low threshold (0.3): is_c2={result_low.is_c2}, prob={result_low.c2_probability:.2f}") |
|
|
|
|
|
|
|
|
result_high = sentinel.analyze(connections, threshold=0.7) |
|
|
print(f" High threshold (0.7): is_c2={result_high.is_c2}, prob={result_high.c2_probability:.2f}") |
|
|
|
|
|
|
|
|
result_strict = sentinel.analyze(connections, strict_mode=True) |
|
|
print(f" Strict mode: is_c2={result_strict.is_c2}, prob={result_strict.c2_probability:.2f}") |
|
|
|
|
|
|
|
|
if __name__ == '__main__': |
|
|
main() |
|
|
|