#!/usr/bin/env python3 """C2Sentinel Demo""" import gradio as gr import json import re from datetime import datetime from huggingface_hub import hf_hub_download # Load model from HuggingFace (force_download ensures latest version) hf_hub_download(repo_id="danielostrow/c2sentinel", filename="c2sentinel.py", local_dir=".", force_download=True) hf_hub_download(repo_id="danielostrow/c2sentinel", filename="c2_sentinel.safetensors", local_dir=".", force_download=True) hf_hub_download(repo_id="danielostrow/c2sentinel", filename="c2_sentinel.json", local_dir=".", force_download=True) hf_hub_download(repo_id="danielostrow/c2sentinel", filename="normalization_params.npz", local_dir=".", force_download=True) from c2sentinel import C2Sentinel sentinel = C2Sentinel.load('c2_sentinel') EXAMPLES = { "C2 Beacon": [{"timestamp": 1705600000+i*60, "dst_ip": "45.33.32.156", "dst_port": 443, "bytes_sent": 200, "bytes_recv": 500} for i in range(8)], "Metasploit": [{"timestamp": 1705600000+i*30, "dst_ip": "10.10.10.10", "dst_port": 4444, "bytes_sent": 150, "bytes_recv": 300} for i in range(6)], "SSH Keepalive": [{"timestamp": 1705600000+i*30, "dst_ip": "192.168.1.10", "dst_port": 22, "bytes_sent": 48, "bytes_recv": 48} for i in range(6)], "Web Traffic": [{"timestamp": 1705600000+i*5, "dst_ip": f"93.184.{i}.34", "dst_port": 443, "bytes_sent": 500+i*100, "bytes_recv": 15000+i*5000} for i in range(5)], } def parse_ts(s): if not s: return None try: t = float(s) return t/1000 if t > 1e12 else t except: pass for f in ["%Y-%m-%dT%H:%M:%S.%fZ", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%d %H:%M:%S", "%b %d %H:%M:%S"]: try: dt = datetime.strptime(str(s).strip(), f) if dt.year == 1900: dt = dt.replace(year=2026) return dt.timestamp() except: pass return None def parse_logs(content): conns = [] for line in content.strip().split('\n'): line = line.strip() if not line or line.startswith('#'): continue c = None # JSON try: o = json.loads(line) c = { 'timestamp': parse_ts(o.get('timestamp') or o.get('ts') or o.get('@timestamp') or o.get('time')), 'dst_ip': o.get('dst_ip') or o.get('dest_ip') or o.get('id.resp_h') or o.get('DestinationIP') or o.get('dst'), 'dst_port': o.get('dst_port') or o.get('dest_port') or o.get('id.resp_p') or o.get('DestinationPort') or o.get('dport'), 'bytes_sent': o.get('bytes_sent') or o.get('orig_bytes') or o.get('SentBytes') or 100, 'bytes_recv': o.get('bytes_recv') or o.get('resp_bytes') or o.get('ReceivedBytes') or 100, } except: pass # Zeek if not c: p = line.split('\t') if len(p) >= 10: try: c = {'timestamp': parse_ts(p[0]), 'dst_ip': p[4] if p[4]!='-' else None, 'dst_port': int(p[5]) if p[5]!='-' else 443, 'bytes_sent': int(p[9]) if p[9]!='-' else 100, 'bytes_recv': int(p[10]) if len(p)>10 and p[10]!='-' else 100} except: pass # Syslog if not c: m = re.match(r'^(\w{3}\s+\d+\s+\d+:\d+:\d+)\s+\S+\s+\S+:\s*(.*)$', line) if m: ip = re.search(r'(\d+\.\d+\.\d+\.\d+)', m.group(2)) if ip: c = {'timestamp': parse_ts(m.group(1)), 'dst_ip': ip.group(1), 'dst_port': 443, 'bytes_sent': 100, 'bytes_recv': 100} # Key=value if not c and '=' in line: kv = dict(re.findall(r'(\w+)=(\S+)', line)) ip = kv.get('DestAddress') or kv.get('DestinationIp') or kv.get('dst') or kv.get('RemoteAddress') if ip: c = {'timestamp': parse_ts(kv.get('TimeGenerated') or kv.get('EventTime')) or datetime.now().timestamp(), 'dst_ip': ip, 'dst_port': int(kv.get('DestPort', 443)), 'bytes_sent': 100, 'bytes_recv': 100} # CSV if not c and ',' in line: for p in line.split(','): ip = re.search(r'(\d+\.\d+\.\d+\.\d+)', p) if ip: c = {'timestamp': datetime.now().timestamp(), 'dst_ip': ip.group(1), 'dst_port': 443, 'bytes_sent': 100, 'bytes_recv': 100} break # Fallback if not c: ip = re.search(r'(\d+\.\d+\.\d+\.\d+)', line) if ip: c = {'timestamp': datetime.now().timestamp(), 'dst_ip': ip.group(1), 'dst_port': 443, 'bytes_sent': 100, 'bytes_recv': 100} if c and c.get('dst_ip') and c.get('timestamp'): c['dst_port'] = int(c.get('dst_port') or 443) c['bytes_sent'] = int(c.get('bytes_sent') or 100) c['bytes_recv'] = int(c.get('bytes_recv') or 100) conns.append(c) return conns def analyze(text, file, example, thresh, wl, bl): try: sentinel.whitelist_ips = set() sentinel.blacklist_ips = set() if wl: sentinel.add_whitelist(ips=[x.strip() for x in wl.split(',') if x.strip()]) if bl: sentinel.add_blacklist(ips=[x.strip() for x in bl.split(',') if x.strip()]) conns = [] if file: with open(file, 'r', encoding='utf-8', errors='ignore') as f: content = f.read() conns = parse_logs(content) if not conns: try: conns = json.loads(content) except: pass if not conns and example in EXAMPLES: conns = EXAMPLES[example] if not conns and text: conns = parse_logs(text) if not conns: try: conns = json.loads(text) except: pass if not conns: return "No valid connections found" if len(conns) < 3: return f"Need 3+ connections (found {len(conns)})" r = sentinel.analyze(conns, threshold=thresh) out = f"### {'🚨 C2 DETECTED: '+r.c2_type if r.is_c2 else '✅ No C2 Detected'}\n\n" out += f"**Probability:** {r.c2_probability:.0%} | **Confidence:** {r.confidence:.0%} | **Connections:** {r.connections_analyzed}\n\n" if r.matched_legitimate_pattern: out += f"**Matched Pattern:** {r.matched_legitimate_pattern}\n\n" if r.risk_factors: out += "**Risk Factors:**\n" for rf in r.risk_factors[:5]: out += f"- {rf}\n" out += "\n" if r.mitigating_factors: out += "**Mitigating Factors:**\n" for mf in r.mitigating_factors[:3]: out += f"- {mf}\n" out += "\n" # Show destination summary if r.destination_summary: out += "**Destinations:**\n" for dest, count in list(r.destination_summary.get('destinations', {}).items())[:5]: out += f"- `{dest}` ({count} connections)\n" out += "\n" # Show time range if r.time_range: duration = r.time_range.get('duration', 0) out += f"**Time Span:** {duration:.0f} seconds\n\n" # If C2 detected, show the flagged connections if r.is_c2 and r.suspicious_connections: out += "**Flagged Connections:**\n```\n" out += f"{'#':<3} {'Timestamp':<12} {'Destination':<22} {'Sent':<8} {'Recv':<8}\n" out += "-" * 55 + "\n" for sc in r.suspicious_connections[:10]: ts = sc.get('timestamp', 0) ts_str = str(int(ts))[-6:] if ts else "N/A" dst = f"{sc.get('dst_ip', '?')}:{sc.get('dst_port', '?')}" out += f"{sc.get('index', 0):<3} {ts_str:<12} {dst:<22} {sc.get('bytes_sent', 0):<8} {sc.get('bytes_recv', 0):<8}\n" if len(r.suspicious_connections) > 10: out += f"... and {len(r.suspicious_connections) - 10} more\n" out += "```\n\n" # Show IOCs for threat intel if r.is_c2 and r.iocs: out += "**IOCs for Threat Intel:**\n" out += f"- IPs: `{', '.join(r.iocs.get('ip_addresses', []))}`\n" out += f"- Ports: `{', '.join(map(str, r.iocs.get('ports', [])))}`\n" if r.iocs.get('timing_signature'): ts = r.iocs['timing_signature'] out += f"- Beacon Interval: ~{ts.get('mean_interval', 0):.1f}s (CV: {ts.get('interval_cv', 0):.2f})\n" return out except Exception as e: return f"Error: {e}" with gr.Blocks(title="C2Sentinel") as demo: gr.Markdown("## C2Sentinel - Beacon Detection\n[Docs](https://huggingface.co/danielostrow/c2sentinel)") with gr.Row(): with gr.Column(scale=2): file_in = gr.File(label="Log File (drag & drop or click)", type="filepath") text_in = gr.Textbox(label="Or Paste Logs", lines=5, placeholder="Paste log content here...") ex_in = gr.Dropdown(choices=[""] + list(EXAMPLES.keys()), value="", label="Example") with gr.Column(scale=1): thresh = gr.Slider(0.3, 0.8, 0.5, label="Threshold") wl = gr.Textbox(label="Whitelist", placeholder="8.8.8.8") bl = gr.Textbox(label="Blacklist", placeholder="10.10.10.10") btn = gr.Button("Analyze", variant="primary") out = gr.Markdown() btn.click(analyze, [text_in, file_in, ex_in, thresh, wl, bl], out) demo.launch(server_name="0.0.0.0", server_port=7860)