c2sentinel / app.py
danielostrow's picture
Show flagged connections and IOCs in output for remediation
afa2824 verified
#!/usr/bin/env python3
"""C2Sentinel Demo"""
import gradio as gr
import json
import re
from datetime import datetime
from huggingface_hub import hf_hub_download
# Load model from HuggingFace (force_download ensures latest version)
hf_hub_download(repo_id="danielostrow/c2sentinel", filename="c2sentinel.py", local_dir=".", force_download=True)
hf_hub_download(repo_id="danielostrow/c2sentinel", filename="c2_sentinel.safetensors", local_dir=".", force_download=True)
hf_hub_download(repo_id="danielostrow/c2sentinel", filename="c2_sentinel.json", local_dir=".", force_download=True)
hf_hub_download(repo_id="danielostrow/c2sentinel", filename="normalization_params.npz", local_dir=".", force_download=True)
from c2sentinel import C2Sentinel
sentinel = C2Sentinel.load('c2_sentinel')
EXAMPLES = {
"C2 Beacon": [{"timestamp": 1705600000+i*60, "dst_ip": "45.33.32.156", "dst_port": 443, "bytes_sent": 200, "bytes_recv": 500} for i in range(8)],
"Metasploit": [{"timestamp": 1705600000+i*30, "dst_ip": "10.10.10.10", "dst_port": 4444, "bytes_sent": 150, "bytes_recv": 300} for i in range(6)],
"SSH Keepalive": [{"timestamp": 1705600000+i*30, "dst_ip": "192.168.1.10", "dst_port": 22, "bytes_sent": 48, "bytes_recv": 48} for i in range(6)],
"Web Traffic": [{"timestamp": 1705600000+i*5, "dst_ip": f"93.184.{i}.34", "dst_port": 443, "bytes_sent": 500+i*100, "bytes_recv": 15000+i*5000} for i in range(5)],
}
def parse_ts(s):
if not s: return None
try:
t = float(s)
return t/1000 if t > 1e12 else t
except: pass
for f in ["%Y-%m-%dT%H:%M:%S.%fZ", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%d %H:%M:%S", "%b %d %H:%M:%S"]:
try:
dt = datetime.strptime(str(s).strip(), f)
if dt.year == 1900: dt = dt.replace(year=2026)
return dt.timestamp()
except: pass
return None
def parse_logs(content):
conns = []
for line in content.strip().split('\n'):
line = line.strip()
if not line or line.startswith('#'): continue
c = None
# JSON
try:
o = json.loads(line)
c = {
'timestamp': parse_ts(o.get('timestamp') or o.get('ts') or o.get('@timestamp') or o.get('time')),
'dst_ip': o.get('dst_ip') or o.get('dest_ip') or o.get('id.resp_h') or o.get('DestinationIP') or o.get('dst'),
'dst_port': o.get('dst_port') or o.get('dest_port') or o.get('id.resp_p') or o.get('DestinationPort') or o.get('dport'),
'bytes_sent': o.get('bytes_sent') or o.get('orig_bytes') or o.get('SentBytes') or 100,
'bytes_recv': o.get('bytes_recv') or o.get('resp_bytes') or o.get('ReceivedBytes') or 100,
}
except: pass
# Zeek
if not c:
p = line.split('\t')
if len(p) >= 10:
try:
c = {'timestamp': parse_ts(p[0]), 'dst_ip': p[4] if p[4]!='-' else None, 'dst_port': int(p[5]) if p[5]!='-' else 443,
'bytes_sent': int(p[9]) if p[9]!='-' else 100, 'bytes_recv': int(p[10]) if len(p)>10 and p[10]!='-' else 100}
except: pass
# Syslog
if not c:
m = re.match(r'^(\w{3}\s+\d+\s+\d+:\d+:\d+)\s+\S+\s+\S+:\s*(.*)$', line)
if m:
ip = re.search(r'(\d+\.\d+\.\d+\.\d+)', m.group(2))
if ip:
c = {'timestamp': parse_ts(m.group(1)), 'dst_ip': ip.group(1), 'dst_port': 443, 'bytes_sent': 100, 'bytes_recv': 100}
# Key=value
if not c and '=' in line:
kv = dict(re.findall(r'(\w+)=(\S+)', line))
ip = kv.get('DestAddress') or kv.get('DestinationIp') or kv.get('dst') or kv.get('RemoteAddress')
if ip:
c = {'timestamp': parse_ts(kv.get('TimeGenerated') or kv.get('EventTime')) or datetime.now().timestamp(),
'dst_ip': ip, 'dst_port': int(kv.get('DestPort', 443)), 'bytes_sent': 100, 'bytes_recv': 100}
# CSV
if not c and ',' in line:
for p in line.split(','):
ip = re.search(r'(\d+\.\d+\.\d+\.\d+)', p)
if ip:
c = {'timestamp': datetime.now().timestamp(), 'dst_ip': ip.group(1), 'dst_port': 443, 'bytes_sent': 100, 'bytes_recv': 100}
break
# Fallback
if not c:
ip = re.search(r'(\d+\.\d+\.\d+\.\d+)', line)
if ip:
c = {'timestamp': datetime.now().timestamp(), 'dst_ip': ip.group(1), 'dst_port': 443, 'bytes_sent': 100, 'bytes_recv': 100}
if c and c.get('dst_ip') and c.get('timestamp'):
c['dst_port'] = int(c.get('dst_port') or 443)
c['bytes_sent'] = int(c.get('bytes_sent') or 100)
c['bytes_recv'] = int(c.get('bytes_recv') or 100)
conns.append(c)
return conns
def analyze(text, file, example, thresh, wl, bl):
try:
sentinel.whitelist_ips = set()
sentinel.blacklist_ips = set()
if wl: sentinel.add_whitelist(ips=[x.strip() for x in wl.split(',') if x.strip()])
if bl: sentinel.add_blacklist(ips=[x.strip() for x in bl.split(',') if x.strip()])
conns = []
if file:
with open(file, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
conns = parse_logs(content)
if not conns:
try: conns = json.loads(content)
except: pass
if not conns and example in EXAMPLES:
conns = EXAMPLES[example]
if not conns and text:
conns = parse_logs(text)
if not conns:
try: conns = json.loads(text)
except: pass
if not conns: return "No valid connections found"
if len(conns) < 3: return f"Need 3+ connections (found {len(conns)})"
r = sentinel.analyze(conns, threshold=thresh)
out = f"### {'🚨 C2 DETECTED: '+r.c2_type if r.is_c2 else '✅ No C2 Detected'}\n\n"
out += f"**Probability:** {r.c2_probability:.0%} | **Confidence:** {r.confidence:.0%} | **Connections:** {r.connections_analyzed}\n\n"
if r.matched_legitimate_pattern:
out += f"**Matched Pattern:** {r.matched_legitimate_pattern}\n\n"
if r.risk_factors:
out += "**Risk Factors:**\n"
for rf in r.risk_factors[:5]:
out += f"- {rf}\n"
out += "\n"
if r.mitigating_factors:
out += "**Mitigating Factors:**\n"
for mf in r.mitigating_factors[:3]:
out += f"- {mf}\n"
out += "\n"
# Show destination summary
if r.destination_summary:
out += "**Destinations:**\n"
for dest, count in list(r.destination_summary.get('destinations', {}).items())[:5]:
out += f"- `{dest}` ({count} connections)\n"
out += "\n"
# Show time range
if r.time_range:
duration = r.time_range.get('duration', 0)
out += f"**Time Span:** {duration:.0f} seconds\n\n"
# If C2 detected, show the flagged connections
if r.is_c2 and r.suspicious_connections:
out += "**Flagged Connections:**\n```\n"
out += f"{'#':<3} {'Timestamp':<12} {'Destination':<22} {'Sent':<8} {'Recv':<8}\n"
out += "-" * 55 + "\n"
for sc in r.suspicious_connections[:10]:
ts = sc.get('timestamp', 0)
ts_str = str(int(ts))[-6:] if ts else "N/A"
dst = f"{sc.get('dst_ip', '?')}:{sc.get('dst_port', '?')}"
out += f"{sc.get('index', 0):<3} {ts_str:<12} {dst:<22} {sc.get('bytes_sent', 0):<8} {sc.get('bytes_recv', 0):<8}\n"
if len(r.suspicious_connections) > 10:
out += f"... and {len(r.suspicious_connections) - 10} more\n"
out += "```\n\n"
# Show IOCs for threat intel
if r.is_c2 and r.iocs:
out += "**IOCs for Threat Intel:**\n"
out += f"- IPs: `{', '.join(r.iocs.get('ip_addresses', []))}`\n"
out += f"- Ports: `{', '.join(map(str, r.iocs.get('ports', [])))}`\n"
if r.iocs.get('timing_signature'):
ts = r.iocs['timing_signature']
out += f"- Beacon Interval: ~{ts.get('mean_interval', 0):.1f}s (CV: {ts.get('interval_cv', 0):.2f})\n"
return out
except Exception as e:
return f"Error: {e}"
with gr.Blocks(title="C2Sentinel") as demo:
gr.Markdown("## C2Sentinel - Beacon Detection\n[Docs](https://huggingface.co/danielostrow/c2sentinel)")
with gr.Row():
with gr.Column(scale=2):
file_in = gr.File(label="Log File (drag & drop or click)", type="filepath")
text_in = gr.Textbox(label="Or Paste Logs", lines=5, placeholder="Paste log content here...")
ex_in = gr.Dropdown(choices=[""] + list(EXAMPLES.keys()), value="", label="Example")
with gr.Column(scale=1):
thresh = gr.Slider(0.3, 0.8, 0.5, label="Threshold")
wl = gr.Textbox(label="Whitelist", placeholder="8.8.8.8")
bl = gr.Textbox(label="Blacklist", placeholder="10.10.10.10")
btn = gr.Button("Analyze", variant="primary")
out = gr.Markdown()
btn.click(analyze, [text_in, file_in, ex_in, thresh, wl, bl], out)
demo.launch(server_name="0.0.0.0", server_port=7860)