# C2Sentinel API Reference Complete technical documentation for the C2Sentinel Python API. **Author:** Daniel Ostrow **Website:** [neuralintellect.com](https://neuralintellect.com) --- ## Table of Contents 1. [C2Sentinel Class](#c2sentinel-class) 2. [AnalysisResult Class](#analysisresult-class) 3. [ConnectionContext Class](#connectioncontext-class) 4. [ReconSupport Class](#reconsupport-class) 5. [FeatureExtractor Class](#featureextractor-class) 6. [LogParser Class](#logparser-class) 7. [Enums and Constants](#enums-and-constants) --- ## C2Sentinel Class Main interface for C2 detection. ### Constructor ```python C2Sentinel(model: LogBERTC2Sentinel, config: C2SentinelConfig, device: str = 'auto') ``` | Parameter | Type | Description | |-----------|------|-------------| | `model` | LogBERTC2Sentinel | The neural network model | | `config` | C2SentinelConfig | Model configuration | | `device` | str | Device for inference ('auto', 'cpu', 'cuda') | ### Class Methods #### load ```python @classmethod def load(cls, path: str, device: str = 'auto') -> 'C2Sentinel' ``` Load a pre-trained model from safetensors format. | Parameter | Type | Description | |-----------|------|-------------| | `path` | str | Path to model files (without extension) | | `device` | str | Device for inference | **Returns:** C2Sentinel instance **Example:** ```python sentinel = C2Sentinel.load('c2_sentinel') sentinel = C2Sentinel.load('/path/to/c2_sentinel', device='cuda') ``` #### create_new ```python @classmethod def create_new(cls, device: str = 'auto') -> 'C2Sentinel' ``` Create a new untrained model instance. **Returns:** C2Sentinel instance with random weights --- ### Instance Methods #### analyze ```python def analyze( self, connections: List[Dict], threshold: float = 0.5, context: Optional[ConnectionContext] = None, include_features: bool = False, strict_mode: bool = False ) -> AnalysisResult ``` Analyze a list of connections for C2 activity. | Parameter | Type | Default | Description | |-----------|------|---------|-------------| | `connections` | List[Dict] | required | List of connection records | | `threshold` | float | 0.5 | Detection threshold (0.0-1.0) | | `context` | ConnectionContext | None | Optional context for enrichment | | `include_features` | bool | False | Include raw feature vector in result | | `strict_mode` | bool | False | Enforce minimum 0.7 threshold | **Returns:** AnalysisResult object **Connection Record Fields:** ```python { 'timestamp': float, # Required: Unix timestamp 'dst_ip': str, # Required: Destination IP 'dst_port': int, # Required: Destination port 'bytes_sent': int, # Required: Bytes sent 'bytes_recv': int, # Required: Bytes received 'src_ip': str, # Optional: Source IP 'src_port': int, # Optional: Source port 'protocol': str, # Optional: 'tcp' or 'udp' 'duration': float # Optional: Duration in seconds } ``` **Example:** ```python connections = [ {'timestamp': 1000, 'dst_ip': '10.0.0.1', 'dst_port': 443, 'bytes_sent': 200, 'bytes_recv': 500}, {'timestamp': 1060, 'dst_ip': '10.0.0.1', 'dst_port': 443, 'bytes_sent': 200, 'bytes_recv': 500}, ] result = sentinel.analyze(connections) result = sentinel.analyze(connections, threshold=0.7, strict_mode=True) ``` --- #### analyze_batch ```python def analyze_batch( self, connection_groups: List[List[Dict]], threshold: float = 0.5, contexts: Optional[List[ConnectionContext]] = None, parallel: bool = True ) -> List[AnalysisResult] ``` Analyze multiple connection groups. | Parameter | Type | Default | Description | |-----------|------|---------|-------------| | `connection_groups` | List[List[Dict]] | required | List of connection lists | | `threshold` | float | 0.5 | Detection threshold | | `contexts` | List[ConnectionContext] | None | Context for each group | | `parallel` | bool | True | Enable parallel processing | **Returns:** List of AnalysisResult objects **Example:** ```python groups = [ [conn1, conn2, conn3], [conn4, conn5, conn6], ] results = sentinel.analyze_batch(groups) ``` --- #### analyze_logs ```python def analyze_logs( self, log_lines: List[str], group_by_dst: bool = True, threshold: float = 0.5 ) -> List[Dict] ``` Parse and analyze raw log lines. | Parameter | Type | Default | Description | |-----------|------|---------|-------------| | `log_lines` | List[str] | required | Raw log lines | | `group_by_dst` | bool | True | Group connections by destination IP | | `threshold` | float | 0.5 | Detection threshold | **Returns:** List of result dictionaries, sorted by probability (descending) **Supported Formats:** - JSON logs with standard fields - Zeek/Bro conn.log (tab-separated) - Syslog with IP:port patterns **Example:** ```python with open('conn.log') as f: lines = f.readlines() results = sentinel.analyze_logs(lines, group_by_dst=True) for r in results: print(f"{r['dst_ip']}: {r['c2_probability']}") ``` --- #### add_whitelist ```python def add_whitelist( self, ips: List[str] = None, domains: List[str] = None ) ``` Add IPs or domains to the whitelist. Whitelisted destinations receive reduced C2 probability. | Parameter | Type | Description | |-----------|------|-------------| | `ips` | List[str] | IP addresses to whitelist | | `domains` | List[str] | Domain names to whitelist | **Example:** ```python sentinel.add_whitelist( ips=['8.8.8.8', '1.1.1.1'], domains=['google.com', 'github.com'] ) ``` --- #### add_blacklist ```python def add_blacklist( self, ips: List[str] = None, domains: List[str] = None ) ``` Add IPs or domains to the blacklist. Blacklisted destinations receive increased C2 probability. | Parameter | Type | Description | |-----------|------|-------------| | `ips` | List[str] | IP addresses to blacklist | | `domains` | List[str] | Domain names to blacklist | --- #### save ```python def save(self, path: str) ``` Save model to safetensors format. | Parameter | Type | Description | |-----------|------|-------------| | `path` | str | Output path (without extension) | Creates two files: - `{path}.safetensors` - Model weights - `{path}.json` - Configuration --- ### Instance Attributes | Attribute | Type | Description | |-----------|------|-------------| | `model` | LogBERTC2Sentinel | The neural network | | `config` | C2SentinelConfig | Model configuration | | `device` | torch.device | Inference device | | `feature_extractor` | FeatureExtractor | Feature extraction module | | `log_parser` | LogParser | Log parsing module | | `context_engine` | ContextInference | Context inference module | | `recon` | ReconSupport | Reconnaissance module | --- ## AnalysisResult Class Dataclass containing analysis results. ### Attributes | Attribute | Type | Description | |-----------|------|-------------| | `is_c2` | bool | True if C2 detected | | `c2_probability` | float | Probability score (0.0-1.0) | | `anomaly_score` | float | Anomaly detection score | | `evasion_score` | float | Evasion technique detection score | | `confidence` | float | Model confidence in prediction | | `c2_type` | str | Detected C2 framework type | | `c2_type_confidence` | float | Confidence in C2 type classification | | `detection_method` | str | Detection method used | | `immediate_detection` | bool | True if signature-based detection | | `context_applied` | bool | True if context was applied | | `original_probability` | float | Probability before context adjustment | | `probability_modifier` | float | Context probability modifier | | `matched_legitimate_pattern` | str | Name of matched legitimate pattern | | `legitimate_confidence` | float | Confidence in legitimate pattern match | | `risk_factors` | List[str] | Factors supporting C2 classification | | `mitigating_factors` | List[str] | Factors against C2 classification | | `service_type` | str | Detected service type | | `recommendations` | List[str] | Suggested follow-up actions | | `features` | List[float] | Raw 40-dimensional feature vector | | `connections_analyzed` | int | Number of connections processed | | `suspicious_connections` | List[Dict] | All connections with individual scores (if C2 detected) | | `iocs` | Dict | Extracted IOCs for threat intel (if C2 detected) | | `time_range` | Dict | Start, end, and duration of analyzed traffic | | `destination_summary` | Dict | Destination IPs, ports, and byte totals | ### Machine-Readable Output Fields When C2 is detected, these fields are populated for scripting and automation: **suspicious_connections** - List of all connections with scores: ```python [ { 'index': 0, 'timestamp': 1705600000, 'src_ip': '192.168.1.100', 'src_port': 52341, 'dst_ip': '45.33.32.156', 'dst_port': 443, 'bytes_sent': 200, 'bytes_recv': 500, 'score': 0.92 }, ... ] ``` **iocs** - Indicators of Compromise for threat intel: ```python { 'ip_addresses': ['45.33.32.156'], 'ports': [443], 'c2_type': 'cobalt_strike', 'timing_signature': { 'mean_interval': 60.0, 'interval_cv': 0.05 }, 'size_signature': { 'mean_bytes_sent': 200.0, 'mean_bytes_recv': 500.0, 'sent_cv': 0.02, 'recv_cv': 0.03 }, 'behavioral_indicators': ['Regular timing with consistent sizes', ...] } ``` **time_range** - Temporal bounds of analyzed traffic: ```python { 'start': 1705600000.0, 'end': 1705600420.0, 'duration': 420.0 } ``` **destination_summary** - Traffic summary: ```python { 'unique_ips': ['45.33.32.156'], 'unique_ports': [443], 'destinations': {'45.33.32.156:443': 8}, 'total_bytes_sent': 1600, 'total_bytes_recv': 4000 } ``` ### Methods #### to_dict ```python def to_dict(self) -> Dict[str, Any] ``` Convert result to dictionary. **Returns:** Dictionary representation of all attributes --- #### to_json ```python def to_json(self, indent: int = 2) -> str ``` Convert result to JSON string for scripting. | Parameter | Type | Default | Description | |-----------|------|---------|-------------| | `indent` | int | 2 | JSON indentation level | **Returns:** JSON string of all attributes **Example:** ```python result = sentinel.analyze(connections) json_output = result.to_json() # Write to file with open('detection_result.json', 'w') as f: f.write(result.to_json()) # Parse in pipeline import json data = json.loads(result.to_json()) ``` --- #### to_ioc_format ```python def to_ioc_format(self) -> Dict[str, Any] ``` Convert result to STIX-like format for threat intelligence platforms. **Returns:** ```python { 'type': 'indicator', 'spec_version': '2.1', 'pattern_type': 'c2-beacon', 'valid_from': timestamp, 'labels': ['malicious-activity', 'c2'], 'confidence': 92, 'indicators': { ... } # Same as iocs field } ``` **Example:** ```python result = sentinel.analyze(connections) if result.is_c2: stix_indicator = result.to_ioc_format() # Send to threat intel platform send_to_misp(stix_indicator) ``` --- ## ConnectionContext Class Dataclass for providing additional context to improve detection accuracy. ### Constructor ```python ConnectionContext( # Process information process_name: Optional[str] = None, process_path: Optional[str] = None, process_pid: Optional[int] = None, parent_process: Optional[str] = None, command_line: Optional[str] = None, # Network metadata dns_queries: Optional[List[str]] = None, resolved_hostname: Optional[str] = None, tls_sni: Optional[str] = None, tls_ja3: Optional[str] = None, tls_ja3s: Optional[str] = None, certificate_issuer: Optional[str] = None, certificate_subject: Optional[str] = None, certificate_valid: Optional[bool] = None, http_user_agent: Optional[str] = None, http_host: Optional[str] = None, # Reputation ip_reputation: Optional[float] = None, domain_reputation: Optional[float] = None, known_good: Optional[bool] = None, known_bad: Optional[bool] = None, threat_intel_match: Optional[str] = None, # Host context source_hostname: Optional[str] = None, source_user: Optional[str] = None, source_is_server: Optional[bool] = None, source_is_workstation: Optional[bool] = None, # Additional geo_country: Optional[str] = None, geo_asn: Optional[str] = None, tags: Optional[List[str]] = None ) ``` ### Attribute Details | Attribute | Type | Effect on Analysis | |-----------|------|-------------------| | `process_name` | str | Known processes reduce probability | | `known_good` | bool | True reduces probability by 90% | | `known_bad` | bool | True increases probability by 5x | | `ip_reputation` | float | Score > 0.8 reduces probability | | `threat_intel_match` | str | Match increases probability by 5x | | `tls_ja3` | str | Known C2 JA3 increases probability | | `certificate_valid` | bool | False increases probability | ### Methods #### to_dict ```python def to_dict(self) -> Dict[str, Any] ``` Convert to dictionary, excluding None values. --- ## ReconSupport Class Reconnaissance and enrichment utilities. ### Class Methods #### analyze_ip ```python @classmethod def analyze_ip(cls, ip: str) -> Dict[str, Any] ``` Analyze an IP address. | Parameter | Type | Description | |-----------|------|-------------| | `ip` | str | IP address to analyze | **Returns:** ```python { 'ip': str, # Original IP 'is_valid': bool, # Valid IP format 'is_private': bool, # RFC 1918 private range 'is_loopback': bool, # Loopback address 'is_multicast': bool, # Multicast address 'is_cdn': bool, # Known CDN range 'cdn_provider': str, # CDN name if applicable 'ip_version': int, # 4 or 6 'reverse_dns': str, # Reverse DNS lookup result 'numeric': int # Numeric representation } ``` **Known CDN Ranges:** - Cloudflare - AWS - Google Cloud - Azure - Akamai --- #### analyze_connection_patterns ```python @classmethod def analyze_connection_patterns(cls, connections: List[Dict]) -> Dict[str, Any] ``` Analyze connection patterns for threat hunting. | Parameter | Type | Description | |-----------|------|-------------| | `connections` | List[Dict] | Connection records | **Returns:** ```python { 'connection_count': int, 'unique_destinations': int, 'unique_ports': int, 'timing': { 'duration_seconds': float, 'mean_interval': float, 'interval_stddev': float, 'interval_cv': float # Coefficient of variation }, 'volume': { 'total_sent': int, 'total_recv': int, 'mean_sent': float, 'mean_recv': float, 'sent_recv_ratio': float }, 'ports': { port_number: count, # Port distribution ... }, 'destinations': { ip: analyze_ip_result, # Per-IP analysis ... }, 'indicators': { 'single_destination': bool, 'consistent_timing': bool, 'consistent_sizes': bool, 'uses_common_port': bool, 'uses_high_port': bool, 'has_cdn_destination': bool, 'all_private_destinations': bool } } ``` --- #### generate_iocs ```python @classmethod def generate_iocs( cls, connections: List[Dict], result: Dict ) -> Dict[str, List[str]] ``` Generate Indicators of Compromise from detected C2. | Parameter | Type | Description | |-----------|------|-------------| | `connections` | List[Dict] | Connection records | | `result` | Dict | Analysis result dictionary | **Returns:** ```python { 'ips': List[str], # Destination IPs 'ports': List[str], # Destination ports 'timing_signatures': List[str], # Beacon timing patterns 'behavioral_indicators': List[str] # Behavioral markers } ``` Only generates IOCs if `result['is_c2']` is True. --- ## FeatureExtractor Class Extracts 40-dimensional feature vectors from connections. ### Constants #### C2_TYPES List of detectable C2 framework types: ```python [ 'unknown', 'metasploit', 'cobalt_strike', 'sliver', 'havoc', 'mythic', 'poshc2', 'merlin', 'empire', 'covenant', 'brute_ratel', 'koadic', 'pupy', 'silenttrinity', 'faction', 'ibombshell', 'godoh', 'dnscat2', 'iodine', 'dns_generic', 'http_custom', 'https_custom', 'websocket', 'domain_fronting', 'cloud_fronting', 'cdn_abuse', 'apt_generic', 'apt28', 'apt29', 'apt41', 'lazarus', 'fin7', 'turla', 'winnti', 'custom' ] ``` ### Methods #### extract_features ```python def extract_features(self, connections: List[Dict]) -> np.ndarray ``` Extract 40-dimensional feature vector. **Returns:** numpy array of shape (40,) **Feature Groups:** - Features 0-9: Timing (intervals, jitter, regularity, periodicity) - Features 10-17: Destinations (diversity, persistence, ports) - Features 18-27: Payload (sizes, ratios, consistency) - Features 28-35: Evasion (jitter patterns, bursts, session length) - Features 36-39: Advanced (night activity, fast beacon ratio, duration) --- #### check_metasploit_signature ```python def check_metasploit_signature( self, connections: List[Dict] ) -> Tuple[bool, float] ``` Check for Metasploit-specific signature patterns. **Returns:** (is_metasploit, confidence) --- #### check_ssh_keepalive ```python def check_ssh_keepalive( self, connections: List[Dict] ) -> Tuple[bool, float] ``` Check for SSH keepalive pattern. **Criteria:** - Port 22 - Small packets (< 100 bytes) - Symmetric traffic (sent/recv ratio 0.5-2.0) - Consistent sizes (CV < 0.2) - Regular intervals matching common keepalive values **Returns:** (is_ssh_keepalive, confidence) --- ## LogParser Class Parses various log formats into connection records. ### Static Methods #### parse_json ```python @staticmethod def parse_json(log_line: str) -> Optional[Dict] ``` Parse JSON formatted log line. **Recognized Fields:** - timestamp, @timestamp - src_ip, source_ip, src - dst_ip, dest_ip, dst - src_port, source_port - dst_port, dest_port - bytes_sent, bytes_out - bytes_recv, bytes_in --- #### parse_zeek_conn ```python @staticmethod def parse_zeek_conn(log_line: str) -> Optional[Dict] ``` Parse Zeek/Bro conn.log format (tab-separated). --- #### parse_syslog ```python @staticmethod def parse_syslog(log_line: str) -> Optional[Dict] ``` Parse common syslog/netflow patterns. **Recognized Patterns:** - `YYYY-MM-DD HH:MM:SS ... IP:port -> IP:port` - `src=IP ... dst=IP ... sport=port ... dport=port` --- ## Enums and Constants ### DetectionMethod ```python class DetectionMethod(Enum): SIGNATURE = "signature" # Port + behavior signature match BEHAVIORAL = "behavioral" # Pure behavioral analysis ML = "ml" # Machine learning inference CONTEXT = "context" # Context-adjusted detection HEURISTIC = "heuristic" # Rule-based detection WHITELIST = "whitelist" # Matched whitelist pattern ``` ### ServiceType ```python class ServiceType(Enum): SSH = "ssh" HTTP = "http" HTTPS = "https" DNS = "dns" DATABASE = "database" API = "api" STREAMING = "streaming" GAMING = "gaming" VPN = "vpn" MONITORING = "monitoring" UNKNOWN = "unknown" ``` ### C2_INDICATOR_PORTS High-confidence C2 signature ports: ```python {4444, 4445, 5555, 31337, 40056} ``` ### C2_COMMON_PORTS Ports commonly used by C2 (require behavioral analysis): ```python {80, 443, 53, 8080, 8443, 8888} ``` --- ## Convenience Functions ### load_model ```python def load_model(path: str, device: str = 'auto') -> C2Sentinel ``` Shorthand for `C2Sentinel.load()`. ### create_model ```python def create_model(device: str = 'auto') -> C2Sentinel ``` Shorthand for `C2Sentinel.create_new()`. ### quick_analyze ```python def quick_analyze( connections: List[Dict], model_path: str = 'c2_sentinel' ) -> AnalysisResult ``` One-shot analysis without keeping model in memory. --- ## Error Handling The API uses standard Python exceptions: | Exception | Cause | |-----------|-------| | `FileNotFoundError` | Model files not found | | `ValueError` | Invalid connection format | | `RuntimeError` | CUDA/device errors | All methods handle empty or malformed input gracefully, returning neutral results rather than raising exceptions.