c2sentinel / API_REFERENCE.md
danielostrow's picture
Document machine-readable output fields for scripting (to_json, to_ioc_format, suspicious_connections, iocs)
3626226 verified

C2Sentinel API Reference

Complete technical documentation for the C2Sentinel Python API.

Author: Daniel Ostrow Website: neuralintellect.com


Table of Contents

  1. C2Sentinel Class
  2. AnalysisResult Class
  3. ConnectionContext Class
  4. ReconSupport Class
  5. FeatureExtractor Class
  6. LogParser Class
  7. Enums and Constants

C2Sentinel Class

Main interface for C2 detection.

Constructor

C2Sentinel(model: LogBERTC2Sentinel, config: C2SentinelConfig, device: str = 'auto')
Parameter Type Description
model LogBERTC2Sentinel The neural network model
config C2SentinelConfig Model configuration
device str Device for inference ('auto', 'cpu', 'cuda')

Class Methods

load

@classmethod
def load(cls, path: str, device: str = 'auto') -> 'C2Sentinel'

Load a pre-trained model from safetensors format.

Parameter Type Description
path str Path to model files (without extension)
device str Device for inference

Returns: C2Sentinel instance

Example:

sentinel = C2Sentinel.load('c2_sentinel')
sentinel = C2Sentinel.load('/path/to/c2_sentinel', device='cuda')

create_new

@classmethod
def create_new(cls, device: str = 'auto') -> 'C2Sentinel'

Create a new untrained model instance.

Returns: C2Sentinel instance with random weights


Instance Methods

analyze

def analyze(
    self,
    connections: List[Dict],
    threshold: float = 0.5,
    context: Optional[ConnectionContext] = None,
    include_features: bool = False,
    strict_mode: bool = False
) -> AnalysisResult

Analyze a list of connections for C2 activity.

Parameter Type Default Description
connections List[Dict] required List of connection records
threshold float 0.5 Detection threshold (0.0-1.0)
context ConnectionContext None Optional context for enrichment
include_features bool False Include raw feature vector in result
strict_mode bool False Enforce minimum 0.7 threshold

Returns: AnalysisResult object

Connection Record Fields:

{
    'timestamp': float,      # Required: Unix timestamp
    'dst_ip': str,           # Required: Destination IP
    'dst_port': int,         # Required: Destination port
    'bytes_sent': int,       # Required: Bytes sent
    'bytes_recv': int,       # Required: Bytes received
    'src_ip': str,           # Optional: Source IP
    'src_port': int,         # Optional: Source port
    'protocol': str,         # Optional: 'tcp' or 'udp'
    'duration': float        # Optional: Duration in seconds
}

Example:

connections = [
    {'timestamp': 1000, 'dst_ip': '10.0.0.1', 'dst_port': 443,
     'bytes_sent': 200, 'bytes_recv': 500},
    {'timestamp': 1060, 'dst_ip': '10.0.0.1', 'dst_port': 443,
     'bytes_sent': 200, 'bytes_recv': 500},
]

result = sentinel.analyze(connections)
result = sentinel.analyze(connections, threshold=0.7, strict_mode=True)

analyze_batch

def analyze_batch(
    self,
    connection_groups: List[List[Dict]],
    threshold: float = 0.5,
    contexts: Optional[List[ConnectionContext]] = None,
    parallel: bool = True
) -> List[AnalysisResult]

Analyze multiple connection groups.

Parameter Type Default Description
connection_groups List[List[Dict]] required List of connection lists
threshold float 0.5 Detection threshold
contexts List[ConnectionContext] None Context for each group
parallel bool True Enable parallel processing

Returns: List of AnalysisResult objects

Example:

groups = [
    [conn1, conn2, conn3],
    [conn4, conn5, conn6],
]
results = sentinel.analyze_batch(groups)

analyze_logs

def analyze_logs(
    self,
    log_lines: List[str],
    group_by_dst: bool = True,
    threshold: float = 0.5
) -> List[Dict]

Parse and analyze raw log lines.

Parameter Type Default Description
log_lines List[str] required Raw log lines
group_by_dst bool True Group connections by destination IP
threshold float 0.5 Detection threshold

Returns: List of result dictionaries, sorted by probability (descending)

Supported Formats:

  • JSON logs with standard fields
  • Zeek/Bro conn.log (tab-separated)
  • Syslog with IP:port patterns

Example:

with open('conn.log') as f:
    lines = f.readlines()

results = sentinel.analyze_logs(lines, group_by_dst=True)
for r in results:
    print(f"{r['dst_ip']}: {r['c2_probability']}")

add_whitelist

def add_whitelist(
    self,
    ips: List[str] = None,
    domains: List[str] = None
)

Add IPs or domains to the whitelist. Whitelisted destinations receive reduced C2 probability.

Parameter Type Description
ips List[str] IP addresses to whitelist
domains List[str] Domain names to whitelist

Example:

sentinel.add_whitelist(
    ips=['8.8.8.8', '1.1.1.1'],
    domains=['google.com', 'github.com']
)

add_blacklist

def add_blacklist(
    self,
    ips: List[str] = None,
    domains: List[str] = None
)

Add IPs or domains to the blacklist. Blacklisted destinations receive increased C2 probability.

Parameter Type Description
ips List[str] IP addresses to blacklist
domains List[str] Domain names to blacklist

save

def save(self, path: str)

Save model to safetensors format.

Parameter Type Description
path str Output path (without extension)

Creates two files:

  • {path}.safetensors - Model weights
  • {path}.json - Configuration

Instance Attributes

Attribute Type Description
model LogBERTC2Sentinel The neural network
config C2SentinelConfig Model configuration
device torch.device Inference device
feature_extractor FeatureExtractor Feature extraction module
log_parser LogParser Log parsing module
context_engine ContextInference Context inference module
recon ReconSupport Reconnaissance module

AnalysisResult Class

Dataclass containing analysis results.

Attributes

Attribute Type Description
is_c2 bool True if C2 detected
c2_probability float Probability score (0.0-1.0)
anomaly_score float Anomaly detection score
evasion_score float Evasion technique detection score
confidence float Model confidence in prediction
c2_type str Detected C2 framework type
c2_type_confidence float Confidence in C2 type classification
detection_method str Detection method used
immediate_detection bool True if signature-based detection
context_applied bool True if context was applied
original_probability float Probability before context adjustment
probability_modifier float Context probability modifier
matched_legitimate_pattern str Name of matched legitimate pattern
legitimate_confidence float Confidence in legitimate pattern match
risk_factors List[str] Factors supporting C2 classification
mitigating_factors List[str] Factors against C2 classification
service_type str Detected service type
recommendations List[str] Suggested follow-up actions
features List[float] Raw 40-dimensional feature vector
connections_analyzed int Number of connections processed
suspicious_connections List[Dict] All connections with individual scores (if C2 detected)
iocs Dict Extracted IOCs for threat intel (if C2 detected)
time_range Dict Start, end, and duration of analyzed traffic
destination_summary Dict Destination IPs, ports, and byte totals

Machine-Readable Output Fields

When C2 is detected, these fields are populated for scripting and automation:

suspicious_connections - List of all connections with scores:

[
    {
        'index': 0,
        'timestamp': 1705600000,
        'src_ip': '192.168.1.100',
        'src_port': 52341,
        'dst_ip': '45.33.32.156',
        'dst_port': 443,
        'bytes_sent': 200,
        'bytes_recv': 500,
        'score': 0.92
    },
    ...
]

iocs - Indicators of Compromise for threat intel:

{
    'ip_addresses': ['45.33.32.156'],
    'ports': [443],
    'c2_type': 'cobalt_strike',
    'timing_signature': {
        'mean_interval': 60.0,
        'interval_cv': 0.05
    },
    'size_signature': {
        'mean_bytes_sent': 200.0,
        'mean_bytes_recv': 500.0,
        'sent_cv': 0.02,
        'recv_cv': 0.03
    },
    'behavioral_indicators': ['Regular timing with consistent sizes', ...]
}

time_range - Temporal bounds of analyzed traffic:

{
    'start': 1705600000.0,
    'end': 1705600420.0,
    'duration': 420.0
}

destination_summary - Traffic summary:

{
    'unique_ips': ['45.33.32.156'],
    'unique_ports': [443],
    'destinations': {'45.33.32.156:443': 8},
    'total_bytes_sent': 1600,
    'total_bytes_recv': 4000
}

Methods

to_dict

def to_dict(self) -> Dict[str, Any]

Convert result to dictionary.

Returns: Dictionary representation of all attributes


to_json

def to_json(self, indent: int = 2) -> str

Convert result to JSON string for scripting.

Parameter Type Default Description
indent int 2 JSON indentation level

Returns: JSON string of all attributes

Example:

result = sentinel.analyze(connections)
json_output = result.to_json()

# Write to file
with open('detection_result.json', 'w') as f:
    f.write(result.to_json())

# Parse in pipeline
import json
data = json.loads(result.to_json())

to_ioc_format

def to_ioc_format(self) -> Dict[str, Any]

Convert result to STIX-like format for threat intelligence platforms.

Returns:

{
    'type': 'indicator',
    'spec_version': '2.1',
    'pattern_type': 'c2-beacon',
    'valid_from': timestamp,
    'labels': ['malicious-activity', 'c2'],
    'confidence': 92,
    'indicators': { ... }  # Same as iocs field
}

Example:

result = sentinel.analyze(connections)
if result.is_c2:
    stix_indicator = result.to_ioc_format()
    # Send to threat intel platform
    send_to_misp(stix_indicator)

ConnectionContext Class

Dataclass for providing additional context to improve detection accuracy.

Constructor

ConnectionContext(
    # Process information
    process_name: Optional[str] = None,
    process_path: Optional[str] = None,
    process_pid: Optional[int] = None,
    parent_process: Optional[str] = None,
    command_line: Optional[str] = None,

    # Network metadata
    dns_queries: Optional[List[str]] = None,
    resolved_hostname: Optional[str] = None,
    tls_sni: Optional[str] = None,
    tls_ja3: Optional[str] = None,
    tls_ja3s: Optional[str] = None,
    certificate_issuer: Optional[str] = None,
    certificate_subject: Optional[str] = None,
    certificate_valid: Optional[bool] = None,
    http_user_agent: Optional[str] = None,
    http_host: Optional[str] = None,

    # Reputation
    ip_reputation: Optional[float] = None,
    domain_reputation: Optional[float] = None,
    known_good: Optional[bool] = None,
    known_bad: Optional[bool] = None,
    threat_intel_match: Optional[str] = None,

    # Host context
    source_hostname: Optional[str] = None,
    source_user: Optional[str] = None,
    source_is_server: Optional[bool] = None,
    source_is_workstation: Optional[bool] = None,

    # Additional
    geo_country: Optional[str] = None,
    geo_asn: Optional[str] = None,
    tags: Optional[List[str]] = None
)

Attribute Details

Attribute Type Effect on Analysis
process_name str Known processes reduce probability
known_good bool True reduces probability by 90%
known_bad bool True increases probability by 5x
ip_reputation float Score > 0.8 reduces probability
threat_intel_match str Match increases probability by 5x
tls_ja3 str Known C2 JA3 increases probability
certificate_valid bool False increases probability

Methods

to_dict

def to_dict(self) -> Dict[str, Any]

Convert to dictionary, excluding None values.


ReconSupport Class

Reconnaissance and enrichment utilities.

Class Methods

analyze_ip

@classmethod
def analyze_ip(cls, ip: str) -> Dict[str, Any]

Analyze an IP address.

Parameter Type Description
ip str IP address to analyze

Returns:

{
    'ip': str,              # Original IP
    'is_valid': bool,       # Valid IP format
    'is_private': bool,     # RFC 1918 private range
    'is_loopback': bool,    # Loopback address
    'is_multicast': bool,   # Multicast address
    'is_cdn': bool,         # Known CDN range
    'cdn_provider': str,    # CDN name if applicable
    'ip_version': int,      # 4 or 6
    'reverse_dns': str,     # Reverse DNS lookup result
    'numeric': int          # Numeric representation
}

Known CDN Ranges:

  • Cloudflare
  • AWS
  • Google Cloud
  • Azure
  • Akamai

analyze_connection_patterns

@classmethod
def analyze_connection_patterns(cls, connections: List[Dict]) -> Dict[str, Any]

Analyze connection patterns for threat hunting.

Parameter Type Description
connections List[Dict] Connection records

Returns:

{
    'connection_count': int,
    'unique_destinations': int,
    'unique_ports': int,

    'timing': {
        'duration_seconds': float,
        'mean_interval': float,
        'interval_stddev': float,
        'interval_cv': float       # Coefficient of variation
    },

    'volume': {
        'total_sent': int,
        'total_recv': int,
        'mean_sent': float,
        'mean_recv': float,
        'sent_recv_ratio': float
    },

    'ports': {
        port_number: count,        # Port distribution
        ...
    },

    'destinations': {
        ip: analyze_ip_result,     # Per-IP analysis
        ...
    },

    'indicators': {
        'single_destination': bool,
        'consistent_timing': bool,
        'consistent_sizes': bool,
        'uses_common_port': bool,
        'uses_high_port': bool,
        'has_cdn_destination': bool,
        'all_private_destinations': bool
    }
}

generate_iocs

@classmethod
def generate_iocs(
    cls,
    connections: List[Dict],
    result: Dict
) -> Dict[str, List[str]]

Generate Indicators of Compromise from detected C2.

Parameter Type Description
connections List[Dict] Connection records
result Dict Analysis result dictionary

Returns:

{
    'ips': List[str],                  # Destination IPs
    'ports': List[str],                # Destination ports
    'timing_signatures': List[str],    # Beacon timing patterns
    'behavioral_indicators': List[str] # Behavioral markers
}

Only generates IOCs if result['is_c2'] is True.


FeatureExtractor Class

Extracts 40-dimensional feature vectors from connections.

Constants

C2_TYPES

List of detectable C2 framework types:

[
    'unknown', 'metasploit', 'cobalt_strike', 'sliver', 'havoc',
    'mythic', 'poshc2', 'merlin', 'empire', 'covenant',
    'brute_ratel', 'koadic', 'pupy', 'silenttrinity', 'faction',
    'ibombshell', 'godoh', 'dnscat2', 'iodine', 'dns_generic',
    'http_custom', 'https_custom', 'websocket', 'domain_fronting',
    'cloud_fronting', 'cdn_abuse', 'apt_generic', 'apt28', 'apt29',
    'apt41', 'lazarus', 'fin7', 'turla', 'winnti', 'custom'
]

Methods

extract_features

def extract_features(self, connections: List[Dict]) -> np.ndarray

Extract 40-dimensional feature vector.

Returns: numpy array of shape (40,)

Feature Groups:

  • Features 0-9: Timing (intervals, jitter, regularity, periodicity)
  • Features 10-17: Destinations (diversity, persistence, ports)
  • Features 18-27: Payload (sizes, ratios, consistency)
  • Features 28-35: Evasion (jitter patterns, bursts, session length)
  • Features 36-39: Advanced (night activity, fast beacon ratio, duration)

check_metasploit_signature

def check_metasploit_signature(
    self,
    connections: List[Dict]
) -> Tuple[bool, float]

Check for Metasploit-specific signature patterns.

Returns: (is_metasploit, confidence)


check_ssh_keepalive

def check_ssh_keepalive(
    self,
    connections: List[Dict]
) -> Tuple[bool, float]

Check for SSH keepalive pattern.

Criteria:

  • Port 22
  • Small packets (< 100 bytes)
  • Symmetric traffic (sent/recv ratio 0.5-2.0)
  • Consistent sizes (CV < 0.2)
  • Regular intervals matching common keepalive values

Returns: (is_ssh_keepalive, confidence)


LogParser Class

Parses various log formats into connection records.

Static Methods

parse_json

@staticmethod
def parse_json(log_line: str) -> Optional[Dict]

Parse JSON formatted log line.

Recognized Fields:

  • timestamp, @timestamp
  • src_ip, source_ip, src
  • dst_ip, dest_ip, dst
  • src_port, source_port
  • dst_port, dest_port
  • bytes_sent, bytes_out
  • bytes_recv, bytes_in

parse_zeek_conn

@staticmethod
def parse_zeek_conn(log_line: str) -> Optional[Dict]

Parse Zeek/Bro conn.log format (tab-separated).


parse_syslog

@staticmethod
def parse_syslog(log_line: str) -> Optional[Dict]

Parse common syslog/netflow patterns.

Recognized Patterns:

  • YYYY-MM-DD HH:MM:SS ... IP:port -> IP:port
  • src=IP ... dst=IP ... sport=port ... dport=port

Enums and Constants

DetectionMethod

class DetectionMethod(Enum):
    SIGNATURE = "signature"    # Port + behavior signature match
    BEHAVIORAL = "behavioral"  # Pure behavioral analysis
    ML = "ml"                  # Machine learning inference
    CONTEXT = "context"        # Context-adjusted detection
    HEURISTIC = "heuristic"    # Rule-based detection
    WHITELIST = "whitelist"    # Matched whitelist pattern

ServiceType

class ServiceType(Enum):
    SSH = "ssh"
    HTTP = "http"
    HTTPS = "https"
    DNS = "dns"
    DATABASE = "database"
    API = "api"
    STREAMING = "streaming"
    GAMING = "gaming"
    VPN = "vpn"
    MONITORING = "monitoring"
    UNKNOWN = "unknown"

C2_INDICATOR_PORTS

High-confidence C2 signature ports:

{4444, 4445, 5555, 31337, 40056}

C2_COMMON_PORTS

Ports commonly used by C2 (require behavioral analysis):

{80, 443, 53, 8080, 8443, 8888}

Convenience Functions

load_model

def load_model(path: str, device: str = 'auto') -> C2Sentinel

Shorthand for C2Sentinel.load().

create_model

def create_model(device: str = 'auto') -> C2Sentinel

Shorthand for C2Sentinel.create_new().

quick_analyze

def quick_analyze(
    connections: List[Dict],
    model_path: str = 'c2_sentinel'
) -> AnalysisResult

One-shot analysis without keeping model in memory.


Error Handling

The API uses standard Python exceptions:

Exception Cause
FileNotFoundError Model files not found
ValueError Invalid connection format
RuntimeError CUDA/device errors

All methods handle empty or malformed input gracefully, returning neutral results rather than raising exceptions.