Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """C2Sentinel Demo""" | |
| import gradio as gr | |
| import json | |
| import re | |
| from datetime import datetime | |
| from huggingface_hub import hf_hub_download | |
| # Load model from HuggingFace (force_download ensures latest version) | |
| hf_hub_download(repo_id="danielostrow/c2sentinel", filename="c2sentinel.py", local_dir=".", force_download=True) | |
| hf_hub_download(repo_id="danielostrow/c2sentinel", filename="c2_sentinel.safetensors", local_dir=".", force_download=True) | |
| hf_hub_download(repo_id="danielostrow/c2sentinel", filename="c2_sentinel.json", local_dir=".", force_download=True) | |
| hf_hub_download(repo_id="danielostrow/c2sentinel", filename="normalization_params.npz", local_dir=".", force_download=True) | |
| from c2sentinel import C2Sentinel | |
| sentinel = C2Sentinel.load('c2_sentinel') | |
| EXAMPLES = { | |
| "C2 Beacon": [{"timestamp": 1705600000+i*60, "dst_ip": "45.33.32.156", "dst_port": 443, "bytes_sent": 200, "bytes_recv": 500} for i in range(8)], | |
| "Metasploit": [{"timestamp": 1705600000+i*30, "dst_ip": "10.10.10.10", "dst_port": 4444, "bytes_sent": 150, "bytes_recv": 300} for i in range(6)], | |
| "SSH Keepalive": [{"timestamp": 1705600000+i*30, "dst_ip": "192.168.1.10", "dst_port": 22, "bytes_sent": 48, "bytes_recv": 48} for i in range(6)], | |
| "Web Traffic": [{"timestamp": 1705600000+i*5, "dst_ip": f"93.184.{i}.34", "dst_port": 443, "bytes_sent": 500+i*100, "bytes_recv": 15000+i*5000} for i in range(5)], | |
| } | |
| def parse_ts(s): | |
| if not s: return None | |
| try: | |
| t = float(s) | |
| return t/1000 if t > 1e12 else t | |
| except: pass | |
| for f in ["%Y-%m-%dT%H:%M:%S.%fZ", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%d %H:%M:%S", "%b %d %H:%M:%S"]: | |
| try: | |
| dt = datetime.strptime(str(s).strip(), f) | |
| if dt.year == 1900: dt = dt.replace(year=2026) | |
| return dt.timestamp() | |
| except: pass | |
| return None | |
| def parse_logs(content): | |
| conns = [] | |
| for line in content.strip().split('\n'): | |
| line = line.strip() | |
| if not line or line.startswith('#'): continue | |
| c = None | |
| # JSON | |
| try: | |
| o = json.loads(line) | |
| c = { | |
| 'timestamp': parse_ts(o.get('timestamp') or o.get('ts') or o.get('@timestamp') or o.get('time')), | |
| 'dst_ip': o.get('dst_ip') or o.get('dest_ip') or o.get('id.resp_h') or o.get('DestinationIP') or o.get('dst'), | |
| 'dst_port': o.get('dst_port') or o.get('dest_port') or o.get('id.resp_p') or o.get('DestinationPort') or o.get('dport'), | |
| 'bytes_sent': o.get('bytes_sent') or o.get('orig_bytes') or o.get('SentBytes') or 100, | |
| 'bytes_recv': o.get('bytes_recv') or o.get('resp_bytes') or o.get('ReceivedBytes') or 100, | |
| } | |
| except: pass | |
| # Zeek | |
| if not c: | |
| p = line.split('\t') | |
| if len(p) >= 10: | |
| try: | |
| c = {'timestamp': parse_ts(p[0]), 'dst_ip': p[4] if p[4]!='-' else None, 'dst_port': int(p[5]) if p[5]!='-' else 443, | |
| 'bytes_sent': int(p[9]) if p[9]!='-' else 100, 'bytes_recv': int(p[10]) if len(p)>10 and p[10]!='-' else 100} | |
| except: pass | |
| # Syslog | |
| if not c: | |
| m = re.match(r'^(\w{3}\s+\d+\s+\d+:\d+:\d+)\s+\S+\s+\S+:\s*(.*)$', line) | |
| if m: | |
| ip = re.search(r'(\d+\.\d+\.\d+\.\d+)', m.group(2)) | |
| if ip: | |
| c = {'timestamp': parse_ts(m.group(1)), 'dst_ip': ip.group(1), 'dst_port': 443, 'bytes_sent': 100, 'bytes_recv': 100} | |
| # Key=value | |
| if not c and '=' in line: | |
| kv = dict(re.findall(r'(\w+)=(\S+)', line)) | |
| ip = kv.get('DestAddress') or kv.get('DestinationIp') or kv.get('dst') or kv.get('RemoteAddress') | |
| if ip: | |
| c = {'timestamp': parse_ts(kv.get('TimeGenerated') or kv.get('EventTime')) or datetime.now().timestamp(), | |
| 'dst_ip': ip, 'dst_port': int(kv.get('DestPort', 443)), 'bytes_sent': 100, 'bytes_recv': 100} | |
| # CSV | |
| if not c and ',' in line: | |
| for p in line.split(','): | |
| ip = re.search(r'(\d+\.\d+\.\d+\.\d+)', p) | |
| if ip: | |
| c = {'timestamp': datetime.now().timestamp(), 'dst_ip': ip.group(1), 'dst_port': 443, 'bytes_sent': 100, 'bytes_recv': 100} | |
| break | |
| # Fallback | |
| if not c: | |
| ip = re.search(r'(\d+\.\d+\.\d+\.\d+)', line) | |
| if ip: | |
| c = {'timestamp': datetime.now().timestamp(), 'dst_ip': ip.group(1), 'dst_port': 443, 'bytes_sent': 100, 'bytes_recv': 100} | |
| if c and c.get('dst_ip') and c.get('timestamp'): | |
| c['dst_port'] = int(c.get('dst_port') or 443) | |
| c['bytes_sent'] = int(c.get('bytes_sent') or 100) | |
| c['bytes_recv'] = int(c.get('bytes_recv') or 100) | |
| conns.append(c) | |
| return conns | |
| def analyze(text, file, example, thresh, wl, bl): | |
| try: | |
| sentinel.whitelist_ips = set() | |
| sentinel.blacklist_ips = set() | |
| if wl: sentinel.add_whitelist(ips=[x.strip() for x in wl.split(',') if x.strip()]) | |
| if bl: sentinel.add_blacklist(ips=[x.strip() for x in bl.split(',') if x.strip()]) | |
| conns = [] | |
| if file: | |
| with open(file, 'r', encoding='utf-8', errors='ignore') as f: | |
| content = f.read() | |
| conns = parse_logs(content) | |
| if not conns: | |
| try: conns = json.loads(content) | |
| except: pass | |
| if not conns and example in EXAMPLES: | |
| conns = EXAMPLES[example] | |
| if not conns and text: | |
| conns = parse_logs(text) | |
| if not conns: | |
| try: conns = json.loads(text) | |
| except: pass | |
| if not conns: return "No valid connections found" | |
| if len(conns) < 3: return f"Need 3+ connections (found {len(conns)})" | |
| r = sentinel.analyze(conns, threshold=thresh) | |
| out = f"### {'🚨 C2 DETECTED: '+r.c2_type if r.is_c2 else '✅ No C2 Detected'}\n\n" | |
| out += f"**Probability:** {r.c2_probability:.0%} | **Confidence:** {r.confidence:.0%} | **Connections:** {r.connections_analyzed}\n\n" | |
| if r.matched_legitimate_pattern: | |
| out += f"**Matched Pattern:** {r.matched_legitimate_pattern}\n\n" | |
| if r.risk_factors: | |
| out += "**Risk Factors:**\n" | |
| for rf in r.risk_factors[:5]: | |
| out += f"- {rf}\n" | |
| out += "\n" | |
| if r.mitigating_factors: | |
| out += "**Mitigating Factors:**\n" | |
| for mf in r.mitigating_factors[:3]: | |
| out += f"- {mf}\n" | |
| out += "\n" | |
| # Show destination summary | |
| if r.destination_summary: | |
| out += "**Destinations:**\n" | |
| for dest, count in list(r.destination_summary.get('destinations', {}).items())[:5]: | |
| out += f"- `{dest}` ({count} connections)\n" | |
| out += "\n" | |
| # Show time range | |
| if r.time_range: | |
| duration = r.time_range.get('duration', 0) | |
| out += f"**Time Span:** {duration:.0f} seconds\n\n" | |
| # If C2 detected, show the flagged connections | |
| if r.is_c2 and r.suspicious_connections: | |
| out += "**Flagged Connections:**\n```\n" | |
| out += f"{'#':<3} {'Timestamp':<12} {'Destination':<22} {'Sent':<8} {'Recv':<8}\n" | |
| out += "-" * 55 + "\n" | |
| for sc in r.suspicious_connections[:10]: | |
| ts = sc.get('timestamp', 0) | |
| ts_str = str(int(ts))[-6:] if ts else "N/A" | |
| dst = f"{sc.get('dst_ip', '?')}:{sc.get('dst_port', '?')}" | |
| out += f"{sc.get('index', 0):<3} {ts_str:<12} {dst:<22} {sc.get('bytes_sent', 0):<8} {sc.get('bytes_recv', 0):<8}\n" | |
| if len(r.suspicious_connections) > 10: | |
| out += f"... and {len(r.suspicious_connections) - 10} more\n" | |
| out += "```\n\n" | |
| # Show IOCs for threat intel | |
| if r.is_c2 and r.iocs: | |
| out += "**IOCs for Threat Intel:**\n" | |
| out += f"- IPs: `{', '.join(r.iocs.get('ip_addresses', []))}`\n" | |
| out += f"- Ports: `{', '.join(map(str, r.iocs.get('ports', [])))}`\n" | |
| if r.iocs.get('timing_signature'): | |
| ts = r.iocs['timing_signature'] | |
| out += f"- Beacon Interval: ~{ts.get('mean_interval', 0):.1f}s (CV: {ts.get('interval_cv', 0):.2f})\n" | |
| return out | |
| except Exception as e: | |
| return f"Error: {e}" | |
| with gr.Blocks(title="C2Sentinel") as demo: | |
| gr.Markdown("## C2Sentinel - Beacon Detection\n[Docs](https://huggingface.co/danielostrow/c2sentinel)") | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| file_in = gr.File(label="Log File (drag & drop or click)", type="filepath") | |
| text_in = gr.Textbox(label="Or Paste Logs", lines=5, placeholder="Paste log content here...") | |
| ex_in = gr.Dropdown(choices=[""] + list(EXAMPLES.keys()), value="", label="Example") | |
| with gr.Column(scale=1): | |
| thresh = gr.Slider(0.3, 0.8, 0.5, label="Threshold") | |
| wl = gr.Textbox(label="Whitelist", placeholder="8.8.8.8") | |
| bl = gr.Textbox(label="Blacklist", placeholder="10.10.10.10") | |
| btn = gr.Button("Analyze", variant="primary") | |
| out = gr.Markdown() | |
| btn.click(analyze, [text_in, file_in, ex_in, thresh, wl, bl], out) | |
| demo.launch(server_name="0.0.0.0", server_port=7860) | |