Add machine-readable output fields for scripting (connections_analyzed, suspicious_connections, iocs, time_range, destination_summary)
Browse files- c2sentinel.py +90 -1
c2sentinel.py
CHANGED
|
@@ -1438,11 +1438,34 @@ class AnalysisResult:
|
|
| 1438 |
# Raw features
|
| 1439 |
features: List[float] = field(default_factory=list)
|
| 1440 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1441 |
def to_dict(self) -> Dict[str, Any]:
|
| 1442 |
return asdict(self)
|
| 1443 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1444 |
def __repr__(self) -> str:
|
| 1445 |
-
status = "
|
| 1446 |
return f"<AnalysisResult: {status} | prob={self.c2_probability:.3f} | type={self.c2_type}>"
|
| 1447 |
|
| 1448 |
|
|
@@ -1760,6 +1783,72 @@ class C2Sentinel:
|
|
| 1760 |
if include_features:
|
| 1761 |
result.features = features.tolist()
|
| 1762 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1763 |
return result
|
| 1764 |
|
| 1765 |
def analyze_batch(
|
|
|
|
| 1438 |
# Raw features
|
| 1439 |
features: List[float] = field(default_factory=list)
|
| 1440 |
|
| 1441 |
+
# Connection-level details for scripting
|
| 1442 |
+
connections_analyzed: int = 0
|
| 1443 |
+
suspicious_connections: List[Dict] = field(default_factory=list)
|
| 1444 |
+
iocs: Dict[str, Any] = field(default_factory=dict)
|
| 1445 |
+
time_range: Dict[str, float] = field(default_factory=dict)
|
| 1446 |
+
destination_summary: Dict[str, Any] = field(default_factory=dict)
|
| 1447 |
+
|
| 1448 |
def to_dict(self) -> Dict[str, Any]:
|
| 1449 |
return asdict(self)
|
| 1450 |
|
| 1451 |
+
def to_json(self, indent: int = 2) -> str:
|
| 1452 |
+
"""Return JSON-formatted result for scripting."""
|
| 1453 |
+
return json.dumps(self.to_dict(), indent=indent, default=str)
|
| 1454 |
+
|
| 1455 |
+
def to_ioc_format(self) -> Dict[str, Any]:
|
| 1456 |
+
"""Return IOCs in STIX-like format for threat intel platforms."""
|
| 1457 |
+
return {
|
| 1458 |
+
'type': 'indicator',
|
| 1459 |
+
'spec_version': '2.1',
|
| 1460 |
+
'pattern_type': 'c2-beacon',
|
| 1461 |
+
'valid_from': self.time_range.get('start'),
|
| 1462 |
+
'labels': ['malicious-activity', 'c2'] if self.is_c2 else ['benign'],
|
| 1463 |
+
'confidence': int(self.confidence * 100),
|
| 1464 |
+
'indicators': self.iocs
|
| 1465 |
+
}
|
| 1466 |
+
|
| 1467 |
def __repr__(self) -> str:
|
| 1468 |
+
status = "C2 DETECTED" if self.is_c2 else "Clean"
|
| 1469 |
return f"<AnalysisResult: {status} | prob={self.c2_probability:.3f} | type={self.c2_type}>"
|
| 1470 |
|
| 1471 |
|
|
|
|
| 1783 |
if include_features:
|
| 1784 |
result.features = features.tolist()
|
| 1785 |
|
| 1786 |
+
# ================================================================
|
| 1787 |
+
# PHASE 8: Populate machine-readable output fields
|
| 1788 |
+
# ================================================================
|
| 1789 |
+
|
| 1790 |
+
result.connections_analyzed = len(connections)
|
| 1791 |
+
|
| 1792 |
+
# Time range
|
| 1793 |
+
timestamps = [c.get('timestamp', 0) for c in connections if c.get('timestamp')]
|
| 1794 |
+
if timestamps:
|
| 1795 |
+
result.time_range = {
|
| 1796 |
+
'start': min(timestamps),
|
| 1797 |
+
'end': max(timestamps),
|
| 1798 |
+
'duration': max(timestamps) - min(timestamps)
|
| 1799 |
+
}
|
| 1800 |
+
|
| 1801 |
+
# Destination summary
|
| 1802 |
+
dst_port_counts = {}
|
| 1803 |
+
for conn in connections:
|
| 1804 |
+
dst_ip = conn.get('dst_ip', '')
|
| 1805 |
+
dst_port = conn.get('dst_port', 0)
|
| 1806 |
+
key = f"{dst_ip}:{dst_port}"
|
| 1807 |
+
dst_port_counts[key] = dst_port_counts.get(key, 0) + 1
|
| 1808 |
+
|
| 1809 |
+
result.destination_summary = {
|
| 1810 |
+
'unique_ips': list(dst_ips),
|
| 1811 |
+
'unique_ports': list(ports),
|
| 1812 |
+
'destinations': dst_port_counts,
|
| 1813 |
+
'total_bytes_sent': total_sent,
|
| 1814 |
+
'total_bytes_recv': total_recv
|
| 1815 |
+
}
|
| 1816 |
+
|
| 1817 |
+
# Suspicious connections - mark each with a score
|
| 1818 |
+
if result.is_c2:
|
| 1819 |
+
# All connections to a detected C2 destination are suspicious
|
| 1820 |
+
for i, conn in enumerate(connections):
|
| 1821 |
+
result.suspicious_connections.append({
|
| 1822 |
+
'index': i,
|
| 1823 |
+
'timestamp': conn.get('timestamp'),
|
| 1824 |
+
'src_ip': conn.get('src_ip', ''),
|
| 1825 |
+
'src_port': conn.get('src_port', 0),
|
| 1826 |
+
'dst_ip': conn.get('dst_ip', ''),
|
| 1827 |
+
'dst_port': conn.get('dst_port', 0),
|
| 1828 |
+
'bytes_sent': conn.get('bytes_sent', 0),
|
| 1829 |
+
'bytes_recv': conn.get('bytes_recv', 0),
|
| 1830 |
+
'score': result.c2_probability
|
| 1831 |
+
})
|
| 1832 |
+
|
| 1833 |
+
# IOCs (Indicators of Compromise)
|
| 1834 |
+
if result.is_c2:
|
| 1835 |
+
result.iocs = {
|
| 1836 |
+
'ip_addresses': list(dst_ips),
|
| 1837 |
+
'ports': list(ports),
|
| 1838 |
+
'c2_type': result.c2_type,
|
| 1839 |
+
'timing_signature': {
|
| 1840 |
+
'mean_interval': float(np.mean(np.diff(sorted(timestamps)))) if len(timestamps) > 1 else 0,
|
| 1841 |
+
'interval_cv': float(np.std(np.diff(sorted(timestamps))) / (np.mean(np.diff(sorted(timestamps))) + 1e-6)) if len(timestamps) > 1 else 0
|
| 1842 |
+
},
|
| 1843 |
+
'size_signature': {
|
| 1844 |
+
'mean_bytes_sent': float(np.mean(bytes_sent)) if bytes_sent else 0,
|
| 1845 |
+
'mean_bytes_recv': float(np.mean(bytes_recv)) if bytes_recv else 0,
|
| 1846 |
+
'sent_cv': float(sent_cv),
|
| 1847 |
+
'recv_cv': float(recv_cv)
|
| 1848 |
+
},
|
| 1849 |
+
'behavioral_indicators': result.risk_factors
|
| 1850 |
+
}
|
| 1851 |
+
|
| 1852 |
return result
|
| 1853 |
|
| 1854 |
def analyze_batch(
|