| from server.firewall_environment import FirewallEnvironment |
| from server.graders import run_deterministic_grade |
|
|
| def new_heuristic_policy(env, session_ids): |
| threat_intel = env.get_threat_intelligence() |
| known_bad_ports = set(threat_intel.get("known_bad_ports", [])) |
| actions = {} |
|
|
| for sid in session_ids: |
| try: |
| data = env.evaluate_session(sid) |
| except KeyError: |
| actions[sid] = 0 |
| continue |
|
|
| features = data.get("features", {}) |
| if data.get("revealed_malicious") is True: |
| actions[sid] = 1 |
| continue |
|
|
| dst_port = int(features.get("dst_port", 0)) |
| history = float(features.get("session_history_score", 1.0)) |
| entropy = float(features.get("entropy_score", 0.0)) |
| reuse = float(features.get("connection_reuse", 1.0)) |
| self_signed = int(features.get("is_self_signed", 0)) |
| ja3 = int(features.get("ja3_hash_cluster", 0)) |
| geo = float(features.get("geo_distance", 0.0)) |
| cert_valid = float(features.get("cert_validity_days", 999.0)) |
| tls_ver = int(features.get("tls_version", 1)) |
| dns_q = int(features.get("dns_query_count", 0)) |
| dur = float(features.get("duration_ms", 500.0)) |
| pkts = int(features.get("packet_count", 10)) |
|
|
| if ja3 >= 130: |
| actions[sid] = 1 |
| elif dst_port in known_bad_ports and reuse < 0.4: |
| actions[sid] = 1 |
| elif self_signed == 1 and reuse < 0.5: |
| actions[sid] = 5 |
| elif reuse < 0.45 and dns_q >= 3: |
| actions[sid] = 1 |
| elif dur < 100.0 and pkts > 50 and reuse < 0.2: |
| actions[sid] = 4 |
| elif reuse < 0.55 and dns_q >= 3: |
| actions[sid] = 2 |
| elif cert_valid < 250.0 and reuse < 0.6: |
| actions[sid] = 2 |
| elif entropy > 0.55 and reuse < 0.5: |
| actions[sid] = 2 |
| else: |
| actions[sid] = 0 |
|
|
| return actions |
|
|
| for task in ['easy', 'medium', 'hard']: |
| env = FirewallEnvironment(seed=303) |
| res = run_deterministic_grade(env, task, new_heuristic_policy) |
| print(f"{task}: score={res['score']:.4f} det={res['breakdown']['detection_rate']:.4f} fp_comp={res['breakdown']['fp_complement']:.4f} eff={res['breakdown']['efficiency']:.4f}") |
|
|