"""Core environment logic for the Fake Gang Detection RL environment.""" from __future__ import annotations import copy import json import random from pathlib import Path from typing import Any, Dict, List, Optional from models import ( AccountProfile, AccountStatus, FakeGangAction, FakeGangObservation, FakeGangState, ActionType, ) from server.generator import generate_episode, TASK_CONFIG from server.scoring import ( compute_node_risk, compute_behavior_risk, compute_graph_risk, compute_hub_legitimacy, compute_fake_risk, classify_risk, grader_score as _compute_grader_score, ) # Use the real OpenEnv Environment base class when the SDK is installed; # fall back to a plain object so the env works without it. try: from openenv.core.env_server import Environment as _OpenEnvBase # type: ignore except ImportError: class _OpenEnvBase: # type: ignore[no-redef] pass # --------------------------------------------------------------------------- # Environment # --------------------------------------------------------------------------- EPISODES_DIR = Path(__file__).parent.parent / "episodes" class FakeGangEnvironment(_OpenEnvBase): """OpenEnv-compatible environment for fake Instagram gang detection.""" SUPPORTS_CONCURRENT_SESSIONS = True def __init__(self) -> None: self._ep: Dict[str, Any] = {} self._accounts: Dict[str, Dict[str, Any]] = {} # id -> account dict self._live_edges: Dict[str, List[str]] = {} # id -> follows (mutable, affected by evasion) self._reverse_edges: Dict[str, List[str]] = {} # id -> who follows this id (kept in sync) self._gang_ids: List[str] = [] self._inspected: List[str] = [] self._flagged: List[str] = [] self._visible_ids: List[str] = [] # known to exist self._profiled: Dict[str, AccountProfile] = {} # fully revealed profiles self._account_statuses: Dict[str, str] = {} # id -> "normal"|"suspect"|"confirmed_fake" self._last_grader_score: float = 0.0 self._step_count: int = 0 self._max_steps: int = 30 self._task: str = "easy" self._evasion_count: int = 0 self._evasion_triggered: bool = False self._episode_id: str = "" self._done: bool = False self._score: float = 0.0 self._last_score: float = 0.0 # snapshot before current step; used to compute per-step reward delta self._seed: int = 0 # Round 2: Platform-specific state self._platform: str = "" self._policy: Optional[Any] = None # PlatformPolicy self._revealed_signals: Dict[str, Dict[str, Any]] = { "photo_reuse": {}, "bio_template": {}, "ip_cluster": {}, } # Round 2: tool-use shaping bookkeeping (per-episode) self._tool_calls: Dict[str, set] = { # action_type -> set(account_id) "reverse_image_search": set(), "analyze_bio": set(), "check_ip": set(), } self._policy_called_early: bool = False self._action_count: int = 0 # total step() calls including 0-cost ones self._decision_package: Dict[str, Any] = {} # ------------------------------------------------------------------ # reset # ------------------------------------------------------------------ def reset( self, task: str = "easy", episode_id: Optional[str] = None, seed: Optional[int] = None, platform: Optional[str] = None, **kwargs: Any, ) -> FakeGangObservation: self._task = task self._step_count = 0 self._evasion_count = 0 self._evasion_triggered = False # Clear evasion-fired flags from previous episodes for attr in [a for a in vars(self) if a.startswith('_fired_')]: delattr(self, attr) self._inspected = [] self._flagged = [] self._profiled = {} self._account_statuses = {} self._last_grader_score = 0.0 self._done = False self._score = 0.0 self._last_score = 0.0 # Load or generate episode if seed is None: seed = random.randint(0, 9999) self._seed = seed ep = self._load_episode(task, seed) self._ep = ep self._episode_id = ep["episode_id"] self._max_steps = ep["max_steps"] self._gang_ids = ep["gang_member_ids"] # Round 2: Load platform policy and initialize revealed signals. # Optional platform override lets callers swap the policy lens without # regenerating the episode (network/hidden-signals stay platform-agnostic). if platform is not None: ep["platform"] = platform self._platform = ep.get("platform", "Instagram") self._policy = self._load_policy(self._platform) self._revealed_signals = { "photo_reuse": {}, "bio_template": {}, "ip_cluster": {}, } self._tool_calls = { "reverse_image_search": set(), "analyze_bio": set(), "check_ip": set(), } self._policy_called_early = False self._action_count = 0 self._decision_package = {} # Build account map and live edges self._accounts = {a["id"]: a for a in ep["network"]["accounts"]} self._live_edges = { a["id"]: list(a["true_edges"]["follows"]) for a in ep["network"]["accounts"] } # Build reverse index: who follows each account (kept in sync with _live_edges) self._reverse_edges = {} for follower, targets in self._live_edges.items(): for target in targets: self._reverse_edges.setdefault(target, []).append(follower) # Initial visible IDs (not yet profiled) self._visible_ids = list(ep["starting_visible"]) return self._make_observation(message="Episode started. Investigate accounts to find the fake gang.") # ------------------------------------------------------------------ # step # ------------------------------------------------------------------ def step(self, action: FakeGangAction, **kwargs: Any) -> FakeGangObservation: if self._done: return self._make_observation(message="Episode is already over.") atype = action.action_type acc_id = action.account_id # Round 2: shaping — bonus when GET_POLICY is the very first action. # Recorded once per episode; surfaced inside _do_get_policy. self._action_count += 1 # Trigger evasion if due BEFORE processing the action self._maybe_trigger_evasion() if atype == ActionType.SUBMIT: return self._do_submit() if atype == ActionType.FLAG: return self._do_flag(acc_id) if atype == ActionType.UNFLAG: return self._do_unflag(acc_id) if atype == ActionType.INSPECT: return self._do_inspect(acc_id) if atype == ActionType.INVESTIGATE_NETWORK: return self._do_investigate(acc_id) # Round 2: New tool-call actions if atype == ActionType.REVERSE_IMAGE_SEARCH: return self._do_reverse_image_search(acc_id) if atype == ActionType.ANALYZE_BIO: return self._do_analyze_bio(acc_id) if atype == ActionType.CHECK_IP: return self._do_check_ip(acc_id) if atype == ActionType.GET_POLICY: return self._do_get_policy() return self._make_observation(message=f"Unknown action: {atype}") # ------------------------------------------------------------------ # state property # ------------------------------------------------------------------ @property def state(self) -> FakeGangState: return FakeGangState( episode_id=self._episode_id, step_count=self._step_count, task=self._task, score_so_far=self._score, evasion_count=self._evasion_count, network_size=len(self._accounts), gang_size=len(self._gang_ids), episode_seed=self._seed, platform=self._platform, # Round 2 ) # ------------------------------------------------------------------ # Action handlers # ------------------------------------------------------------------ def _do_inspect(self, acc_id: Optional[str]) -> FakeGangObservation: if acc_id is None or acc_id not in self._accounts: return self._make_observation(message=f"Cannot INSPECT: account '{acc_id}' not found.") self._step_count += 1 self._score -= 0.01 # time cost if acc_id not in self._inspected: self._inspected.append(acc_id) if acc_id not in self._visible_ids: self._visible_ids.append(acc_id) # Reveal profile self._profiled[acc_id] = self._build_profile(acc_id) # Reveal the accounts this one follows neighbors = self._live_edges.get(acc_id, []) for n in neighbors: if n not in self._visible_ids: self._visible_ids.append(n) # Check step limit if self._step_count >= self._max_steps: return self._do_submit(forced=True) return self._make_observation( message=f"Inspected {acc_id}. Found {len(neighbors)} outgoing connections." ) def _do_investigate(self, acc_id: Optional[str]) -> FakeGangObservation: if acc_id is None or acc_id not in self._accounts: return self._make_observation(message=f"Cannot INVESTIGATE_NETWORK: account '{acc_id}' not found.") self._step_count += 2 # costs 2 steps self._score -= 0.02 if acc_id not in self._inspected: self._inspected.append(acc_id) if acc_id not in self._visible_ids: self._visible_ids.append(acc_id) # Reveal neighbors AND their neighbors (2-hop), traversing BOTH follow directions. # Unidirectional (outgoing-only) expansion misses gang members who follow the target # but aren't followed back — with density=0.70 this leaves ~30% unreachable per hop. new_ids = set() def _add_visible(nid: str) -> None: if nid not in self._visible_ids: self._visible_ids.append(nid) new_ids.add(nid) # Outgoing: accounts that acc_id follows for n in self._live_edges.get(acc_id, []): _add_visible(n) for n2 in self._live_edges.get(n, []): _add_visible(n2) for n2 in self._reverse_edges.get(n, []): _add_visible(n2) # Incoming: accounts that follow acc_id (reverse edges) for n in self._reverse_edges.get(acc_id, []): _add_visible(n) for n2 in self._live_edges.get(n, []): _add_visible(n2) for n2 in self._reverse_edges.get(n, []): _add_visible(n2) # Re-cascade SUSPECT to newly visible accounts using two complementary signals: # # Signal 1 — follow-graph: newly visible accounts that a flagged account follows. # Survives post-evasion because it re-checks live_edges (already updated by evasion). for flagged_id in self._flagged: for neighbor in self._live_edges.get(flagged_id, []): if (neighbor in self._visible_ids and self._account_statuses.get(neighbor, "normal") == "normal"): self._account_statuses[neighbor] = "suspect" # # Signal 2 — IP cluster: newly revealed accounts sharing the same IP subnet as any # flagged account. This catches gang members connected via incoming follow edges that # evasion may have removed from live_edges. Zero false positives (gang: shared IP; # real/decoy: unique IP per account). flagged_ips = { self._accounts[fid]["features"].get("ip_cluster_id") for fid in self._flagged if fid in self._accounts } flagged_ips.discard(None) for new_id in new_ids: if new_id not in self._flagged and self._account_statuses.get(new_id, "normal") == "normal": vid_ip = self._accounts.get(new_id, {}).get("features", {}).get("ip_cluster_id") if vid_ip in flagged_ips: self._account_statuses[new_id] = "suspect" # Refresh profiles for already-inspected accounts whose status changed so that # Priority 3 in the rule engine sees updated fake_risk (not stale pre-cascade values). for inspected_id in list(self._inspected): new_status = self._account_statuses.get(inspected_id, "normal") if new_status != "normal" and inspected_id in self._profiled: cached_status = self._profiled[inspected_id].status.value if cached_status != new_status: self._profiled[inspected_id] = self._build_profile(inspected_id) if self._step_count >= self._max_steps: return self._do_submit(forced=True) return self._make_observation( message=f"Investigated network around {acc_id}. Discovered {len(new_ids)} new account IDs." ) def _do_flag(self, acc_id: Optional[str]) -> FakeGangObservation: if acc_id is None or acc_id not in self._accounts: return self._make_observation(message=f"Cannot FLAG: account '{acc_id}' not found.") # No-evidence guard: charge -0.15 and deny when the agent has neither # inspected the account nor invoked any reveal tool on it. has_evidence = ( acc_id in self._inspected or acc_id in self._tool_calls["reverse_image_search"] or acc_id in self._tool_calls["analyze_bio"] or acc_id in self._tool_calls["check_ip"] ) if not has_evidence: self._score -= 0.15 return self._make_observation( message=f"Cannot FLAG: no evidence gathered on {acc_id}." ) if acc_id not in self._flagged: self._flagged.append(acc_id) self._account_statuses[acc_id] = "confirmed_fake" # Cascade 1 — follow-graph: mark accounts that acc_id follows as SUSPECT. # Gang members follow each other (density 0.70+), so this is high-precision. for neighbor in self._live_edges.get(acc_id, []): if (neighbor in self._visible_ids and self._account_statuses.get(neighbor, "normal") == "normal"): self._account_statuses[neighbor] = "suspect" # Cascade 2 — IP cluster: any visible account sharing the same IP subnet is # a gang cohort. Gang: shared_ip_count=9, ip_cluster_id="ip_gang_". # Real/decoy: unique ip_cluster_id. Zero false positives. flagged_ip = self._accounts[acc_id]["features"].get("ip_cluster_id") if flagged_ip: for vid in self._visible_ids: if (vid not in self._flagged and self._account_statuses.get(vid, "normal") == "normal"): vid_ip = self._accounts.get(vid, {}).get("features", {}).get("ip_cluster_id") if vid_ip == flagged_ip: self._account_statuses[vid] = "suspect" # Refresh profiles for already-inspected accounts that FOLLOW acc_id, # because their flagged_neighbor_count just increased (risk score changes). for inspected_id in self._inspected: if acc_id in self._live_edges.get(inspected_id, []): self._profiled[inspected_id] = self._build_profile(inspected_id) return self._make_observation(message=f"Flagged {acc_id} as suspected fake.") def _do_unflag(self, acc_id: Optional[str]) -> FakeGangObservation: if acc_id is None: return self._make_observation(message="Cannot UNFLAG: no account_id provided.") if acc_id in self._flagged: self._flagged.remove(acc_id) self._account_statuses.pop(acc_id, None) return self._make_observation(message=f"Removed flag from {acc_id}.") # ------------------------------------------------------------------ # Round 2: New tool-call action handlers # ------------------------------------------------------------------ def _do_reverse_image_search(self, acc_id: Optional[str]) -> FakeGangObservation: """Reveal photo_reuse_score for account (costs 1 step).""" if acc_id is None or acc_id not in self._accounts: return self._make_observation(message=f"Cannot REVERSE_IMAGE_SEARCH: account '{acc_id}' not found.") # Shaping: penalize redundant tool reuse on same account if acc_id in self._tool_calls["reverse_image_search"]: self._score -= 0.05 self._tool_calls["reverse_image_search"].add(acc_id) self._step_count += 1 self._score -= 0.01 # time cost # Reveal from hidden store (or from features if old episode format) if "hidden_signals" in self._ep and "photo_reuse" in self._ep["hidden_signals"]: score = self._ep["hidden_signals"]["photo_reuse"].get(acc_id, 0.0) else: # Fallback: old episode format score = self._accounts[acc_id]["features"].get("photo_reuse_score", 0.0) self._revealed_signals["photo_reuse"][acc_id] = score # Update account features self._accounts[acc_id]["features"]["photo_reuse_score"] = score # Refresh profile if already inspected if acc_id in self._profiled: self._profiled[acc_id] = self._build_profile(acc_id) # Check step limit if self._step_count >= self._max_steps: return self._do_submit(forced=True) count = int(score * 100) # Simulate "found N matches" return self._make_observation( message=f"Reverse image search: {acc_id} has photo_reuse_score={score:.2f} (found {count} matching images)" ) def _do_analyze_bio(self, acc_id: Optional[str]) -> FakeGangObservation: """Reveal bio_template_score for account (costs 1 step).""" if acc_id is None or acc_id not in self._accounts: return self._make_observation(message=f"Cannot ANALYZE_BIO: account '{acc_id}' not found.") if acc_id in self._tool_calls["analyze_bio"]: self._score -= 0.05 self._tool_calls["analyze_bio"].add(acc_id) self._step_count += 1 self._score -= 0.01 # time cost # Reveal from hidden store (or from features if old episode format) if "hidden_signals" in self._ep and "bio_template" in self._ep["hidden_signals"]: score = self._ep["hidden_signals"]["bio_template"].get(acc_id, 0.0) else: # Fallback: old episode format score = self._accounts[acc_id]["features"].get("bio_template_score", 0.0) self._revealed_signals["bio_template"][acc_id] = score # Update account features self._accounts[acc_id]["features"]["bio_template_score"] = score # Refresh profile if already inspected if acc_id in self._profiled: self._profiled[acc_id] = self._build_profile(acc_id) # Check step limit if self._step_count >= self._max_steps: return self._do_submit(forced=True) count = int(score * 10) # Simulate "matches N templates" return self._make_observation( message=f"Bio analysis: {acc_id} has bio_template_score={score:.2f} (matches {count} known templates)" ) def _do_check_ip(self, acc_id: Optional[str]) -> FakeGangObservation: """Reveal ip_cluster_signal for account (costs 2 steps - expensive!).""" if acc_id is None or acc_id not in self._accounts: return self._make_observation(message=f"Cannot CHECK_IP: account '{acc_id}' not found.") if acc_id in self._tool_calls["check_ip"]: self._score -= 0.10 # CHECK_IP is expensive — heavier redundancy penalty self._tool_calls["check_ip"].add(acc_id) self._step_count += 2 # Higher cost (requires warrant/legal approval) self._score -= 0.02 # Reveal from hidden store (or from features if old episode format) if "hidden_signals" in self._ep and "ip_cluster" in self._ep["hidden_signals"]: cluster_id = self._ep["hidden_signals"]["ip_cluster"].get(acc_id, "") else: # Fallback: old episode format cluster_id = self._accounts[acc_id]["features"].get("ip_cluster_id", "") self._revealed_signals["ip_cluster"][acc_id] = cluster_id # Update account features self._accounts[acc_id]["features"]["ip_cluster_id"] = cluster_id # Count how many visible accounts share this cluster if "hidden_signals" in self._ep and "ip_cluster" in self._ep["hidden_signals"]: cluster_count = sum( 1 for vid in self._visible_ids if self._ep["hidden_signals"]["ip_cluster"].get(vid) == cluster_id ) else: # Fallback: old episode format cluster_count = sum( 1 for vid in self._visible_ids if self._accounts.get(vid, {}).get("features", {}).get("ip_cluster_id") == cluster_id ) # Refresh profile if already inspected if acc_id in self._profiled: self._profiled[acc_id] = self._build_profile(acc_id) # Check step limit if self._step_count >= self._max_steps: return self._do_submit(forced=True) return self._make_observation( message=f"IP check: {acc_id} shares cluster '{cluster_id}' with {cluster_count} visible accounts" ) def _do_get_policy(self) -> FakeGangObservation: """Return platform policy information (costs 0 steps).""" # Shaping: small bonus for calling GET_POLICY before any other action if not self._policy_called_early and self._action_count == 1: self._score += 0.20 self._policy_called_early = True if self._policy is None: return self._make_observation(message="Policy not available") policy_info = ( f"Platform: {self._policy.platform} | " f"Threshold: {self._policy.threshold:.3f} | " f"Primary Signal: {self._policy.primary_enforcement_signal} | " f"FP Penalty: {self._policy.fp_penalty_weight}x | " f"({self._policy.fn_cost_signal} fn_cost, {self._policy.fp_cost_signal} fp_cost)" ) return self._make_observation(message=f"Policy compiled: {policy_info}") # ------------------------------------------------------------------ def _do_submit(self, forced: bool = False) -> FakeGangObservation: self._done = True gang_set = set(self._gang_ids) flagged_set = set(self._flagged) tp = len(gang_set & flagged_set) fp = len(flagged_set - gang_set) fn = len(gang_set - flagged_set) # Round 2: Platform-specific FP penalty fp_penalty = self._policy.fp_penalty_weight if self._policy else 0.5 reward = tp * 1.0 - fp * fp_penalty - fn * 0.3 recall = tp / len(gang_set) if gang_set else 0.0 precision = tp / len(flagged_set) if flagged_set else 0.0 win_recall = self._ep.get("win_recall", 0.8) win_precision = self._ep.get("win_precision", 0.7) if recall >= win_recall and precision >= win_precision: reward += 5.0 # full win bonus if tp == len(gang_set): reward += 3.0 # perfect recall bonus elif recall >= win_recall: reward += 2.0 # partial win # Efficiency bonus steps_left = self._max_steps - self._step_count if not forced and steps_left >= self._max_steps * 0.5: reward += 1.0 # Round 2: Platform-specific bonus if self._policy: if self._policy.platform == "Instagram" and precision >= 0.95: reward += 2.0 # Instagram rewards high precision elif self._policy.platform == "Snapchat" and recall >= 0.95: reward += 2.0 # Snapchat rewards high recall # Evasion penalty (hard mode) if self._task == "hard": reward -= self._evasion_count * 1.0 if forced: reward -= 2.0 # ran out of steps # Round 2: insufficient-evidence penalty. # Flagging an account whose hidden signals were never revealed is # speculative; charge a small per-flag penalty. revealed_any = ( set(self._revealed_signals["photo_reuse"]) | set(self._revealed_signals["bio_template"]) | set(self._revealed_signals["ip_cluster"]) ) unsupported_flags = [f for f in self._flagged if f not in revealed_any] if unsupported_flags: reward -= 0.15 * len(unsupported_flags) self._score += reward # Round 2: Platform-aware grader score threshold = self._policy.threshold if self._policy else 0.35 self._last_grader_score = _compute_grader_score( tp, fp, fn, self._step_count, self._max_steps, threshold, fp_penalty ) won = recall >= win_recall and precision >= win_precision # Round 2: build moderation decision package. # The recommended action is policy-aware: precision-leaning platforms # (low FP penalty already paid for) escalate; recall-leaning platforms # batch-takedown when recall is high. if precision >= 0.95 and recall >= 0.9: recommended_action = "batch_takedown" elif precision >= 0.8 and recall >= 0.7: recommended_action = "scheduled_ban" elif precision >= 0.6: recommended_action = "temporary_hold" else: recommended_action = "queue_for_review" evidence_summary = { "flagged": list(self._flagged), "revealed_photo_reuse": len(self._revealed_signals["photo_reuse"]), "revealed_bio_template": len(self._revealed_signals["bio_template"]), "revealed_ip_cluster": len(self._revealed_signals["ip_cluster"]), "unsupported_flags": unsupported_flags, } policy_rationale = ( f"{self._platform} policy θ={threshold:.3f} " f"(primary={self._policy.primary_enforcement_signal if self._policy else '?'}, " f"FP penalty={fp_penalty}); " f"observed precision={precision:.2f}, recall={recall:.2f}." ) self._decision_package = { "platform": self._platform, "flagged_accounts": list(self._flagged), "recommended_action": recommended_action, "evidence_summary": evidence_summary, "policy_rationale": policy_rationale, "tp": tp, "fp": fp, "fn": fn, "precision": precision, "recall": recall, "reward": self._score, "grader_score": self._last_grader_score, } msg = ( f"{'[WIN] ' if won else '[LOSS] '}" f"TP={tp} FP={fp} FN={fn} " f"Recall={recall:.2f} Precision={precision:.2f} " f"Episode reward={self._score:.2f}\n" f"Decision: {recommended_action}\n" f"flagged_accounts: {json.dumps(list(self._flagged))}\n" f"evidence_summary: {json.dumps(evidence_summary)}\n" f"policy_rationale: {policy_rationale}\n" f"grader_score: {self._last_grader_score:.4f}" ) return self._make_observation(message=msg, terminal_reward=self._score) # ------------------------------------------------------------------ # Evasion # ------------------------------------------------------------------ def _maybe_trigger_evasion(self) -> None: for event in self._ep.get("evasion_schedule", []): if self._step_count >= event["step"] and not self._event_fired(event): self._fire_evasion(event) def _event_fired(self, event: Dict[str, Any]) -> bool: # Track which events have fired by step threshold key = f"_fired_{event['step']}" return getattr(self, key, False) def _fire_evasion(self, event: Dict[str, Any]) -> None: step_key = f"_fired_{event['step']}" setattr(self, step_key, True) self._evasion_count += 1 self._evasion_triggered = True if event["event"] == "unfollow_intragang": drop_rate = event.get("drop_rate", 0.5) rng = random.Random(self._seed + self._evasion_count) gang_set = set(self._gang_ids) for g in self._gang_ids: follows = self._live_edges.get(g, []) kept = [f for f in follows if f not in gang_set or rng.random() > drop_rate] dropped = set(follows) - set(kept) self._live_edges[g] = kept # Keep reverse_edges in sync: remove dropped edges for target in dropped: rev = self._reverse_edges.get(target, []) if g in rev: rev.remove(g) rename_count = event.get("rename_count", 0) if rename_count > 0: rng = random.Random(self._seed + self._evasion_count + 1000) targets = rng.sample(self._gang_ids, min(rename_count, len(self._gang_ids))) for t in targets: self._accounts[t]["features"]["name_change_count"] += 1 # Update profiled cache if already inspected if t in self._profiled: self._profiled[t] = self._build_profile(t) # ------------------------------------------------------------------ # Helpers # ------------------------------------------------------------------ def _compute_post_hour_cluster_score(self, acc_hour: float) -> float: """How closely does this account's posting hour match the flagged accounts' mean hour?""" if not self._flagged: return 0.0 hours = [ self._accounts[fid]["features"]["avg_post_hour"] for fid in self._flagged if fid in self._accounts ] if not hours: return 0.0 mean_h = sum(hours) / len(hours) diff = abs(acc_hour - mean_h) diff = min(diff, 24.0 - diff) # wrap-around distance on 24-hour clock return round(max(0.0, 1.0 - diff / 6.0), 4) def _compute_suspicious_mutual_ratio(self, acc_id: str, follows: List[str]) -> float: """Fraction of suspicious follows that also mutually follow this account.""" suspicious = [ fid for fid in follows if self._account_statuses.get(fid, "normal") in {"suspect", "confirmed_fake"} ] if not suspicious: return 0.0 mutual = [fid for fid in suspicious if acc_id in self._live_edges.get(fid, [])] return round(len(mutual) / len(suspicious), 4) def _build_profile(self, acc_id: str) -> AccountProfile: a = self._accounts[acc_id] f = a["features"] follows = list(self._live_edges.get(acc_id, [])) # ── Derived graph features (computed from live graph state at inspect time) ── # How many of this account's follows are already flagged? flagged_neighbor_count = sum(1 for fid in follows if fid in self._flagged) # Mutual follow rate: fraction of follows that also follow this account back. if follows: mutual_follow_rate = round( sum(1 for fid in follows if acc_id in self._live_edges.get(fid, [])) / len(follows), 4, ) else: mutual_follow_rate = 0.0 # Average photo_reuse_score among already-inspected neighbors. inspected_neighbors = [fid for fid in follows if fid in self._profiled] inspected_neighbor_count = len(inspected_neighbors) if inspected_neighbors: avg_neighbor_photo_reuse = round( sum(self._profiled[fid].photo_reuse_score for fid in inspected_neighbors) / inspected_neighbor_count, 4, ) else: avg_neighbor_photo_reuse = 0.0 # ── Full risk score computation ── post_hour_cluster_score = self._compute_post_hour_cluster_score(f["avg_post_hour"]) suspicious_mutual_ratio = self._compute_suspicious_mutual_ratio(acc_id, follows) flagged_neighbor_ratio = flagged_neighbor_count / max(inspected_neighbor_count, 1) # Round 2: Use 0.0 for signals that haven't been revealed yet photo_reuse = f.get("photo_reuse_score", 0.0) bio_template = f.get("bio_template_score", 0.0) node_risk = compute_node_risk(photo_reuse, bio_template) behavior_risk = compute_behavior_risk(f["account_age_days"], post_hour_cluster_score) graph_risk = compute_graph_risk(flagged_neighbor_ratio, mutual_follow_rate, avg_neighbor_photo_reuse) hub_legitimacy = compute_hub_legitimacy( f["follower_count"], f["following_count"], f["account_age_days"], suspicious_mutual_ratio, ) fake_risk = compute_fake_risk(node_risk, behavior_risk, graph_risk, hub_legitimacy) # Status: explicit (flagged/suspected) takes precedence over formula-derived formula_status = classify_risk(fake_risk) explicit_status = self._account_statuses.get(acc_id, "normal") final_status_str = explicit_status if explicit_status != "normal" else formula_status final_status = AccountStatus(final_status_str) return AccountProfile( account_id=acc_id, follower_count=f["follower_count"], following_count=f["following_count"], post_count=f["post_count"], avg_post_hour=f["avg_post_hour"], photo_reuse_score=photo_reuse, # Round 2: Use variable (0.0 if not revealed) bio_template_score=bio_template, # Round 2: Use variable (0.0 if not revealed) account_age_days=f["account_age_days"], name_change_count=f.get("name_change_count", 0), flagged_neighbor_count=flagged_neighbor_count, mutual_follow_rate=mutual_follow_rate, avg_neighbor_photo_reuse=avg_neighbor_photo_reuse, visible_follows=follows, status=final_status, fake_risk_score=fake_risk, node_risk=node_risk, behavior_risk=behavior_risk, graph_risk=graph_risk, hub_legitimacy_score=hub_legitimacy, comment_repeat_score=f.get("comment_repeat_score", 0.0), shared_ip_count=f.get("shared_ip_count", 0), inspected_neighbor_count=inspected_neighbor_count, post_hour_cluster_score=post_hour_cluster_score, suspicious_mutual_ratio=suspicious_mutual_ratio, ) def _build_hint(self) -> str: """Generate actionable hints for the agent based on current state.""" hints = [] # Hint 1: Uninspected suspects (highest priority) suspect_ids = [ sid for sid in self._visible_ids if sid not in self._flagged and self._account_statuses.get(sid, "normal") == "suspect" ] uninspected_suspects = [s for s in suspect_ids if s not in self._inspected] if uninspected_suspects: hints.append(f"HINT: {len(uninspected_suspects)} SUSPECT accounts need inspection — INSPECT {uninspected_suspects[0]} next (auto-elevated by cascade, likely gang member).") # Hint 2: Unflagged accounts with strong fake signals unflagged_fakes = [] for acc_id in self._inspected: if acc_id in self._flagged: continue p = self._profiled.get(acc_id) if not p: continue if (p.shared_ip_count >= 5 or (p.photo_reuse_score >= 0.50 and p.bio_template_score >= 0.40 and p.hub_legitimacy_score < 0.70)): unflagged_fakes.append(acc_id) if unflagged_fakes and not uninspected_suspects: hints.append(f"HINT: FLAG {unflagged_fakes[0]} — strong fake signals detected (photo_reuse/bio_template/shared_ip). FLAG is FREE (costs 0 steps).") # Hint 3: Submit reminder steps_left = max(0, self._max_steps - self._step_count) if len(self._flagged) >= 10: hints.append("HINT: You have 10 flags — SUBMIT now to end the episode and get scored.") elif steps_left <= 3 and not self._done: hints.append(f"HINT: Only {steps_left} steps left — consider SUBMIT to lock in your score.") return " ".join(hints) def _make_observation( self, message: str = "", terminal_reward: Optional[float] = None, ) -> FakeGangObservation: # Append hints to message for agent guidance hint = self._build_hint() if not self._done else "" full_message = f"{message} {hint}".strip() if hint else message # Per-step reward delta. On terminal step, use the explicit terminal_reward # (final episode reward). Otherwise, compute the score delta vs. the snapshot # taken before this action, so every step returns a float reward (never None). if terminal_reward is not None: step_reward: Optional[float] = terminal_reward else: step_reward = round(self._score - self._last_score, 4) self._last_score = self._score # Build a profile for every visible account, not just inspected ones. # _build_profile reads from features (which tool actions update on reveal), # so uninspected accounts get a profile with hidden signals at 0.0 by default # and revealed values once the corresponding tool has been called. visible_profiles = [] for vid in self._visible_ids: if vid in self._profiled: visible_profiles.append(self._profiled[vid]) elif vid in self._accounts: visible_profiles.append(self._build_profile(vid)) return FakeGangObservation( done=self._done, reward=step_reward, visible_accounts=visible_profiles, visible_account_ids=list(self._visible_ids), flagged_ids=list(self._flagged), inspected_ids=list(self._inspected), graph_edges={ acc_id: list(self._live_edges.get(acc_id, [])) for acc_id in self._inspected }, steps_remaining=max(0, self._max_steps - self._step_count), evasion_triggered=self._evasion_triggered, evasion_count=self._evasion_count, task=self._task, message=full_message, suspect_ids=[ sid for sid in self._visible_ids if sid not in self._flagged and self._account_statuses.get(sid, "normal") == "suspect" ], platform=self._platform, # Round 2: Pass platform to observation ) def _load_episode(self, task: str, seed: int) -> Dict[str, Any]: """Load pre-generated episode JSON or generate on the fly.""" fname = EPISODES_DIR / f"{task}_{seed:03d}.json" if fname.exists(): return json.loads(fname.read_text()) # Generate on the fly and cache ep = generate_episode(task, seed) EPISODES_DIR.mkdir(parents=True, exist_ok=True) fname.write_text(json.dumps(ep, indent=2)) return ep def _load_policy(self, platform: str) -> Optional[Any]: """Load platform policy from compiler.""" try: from server.policy_compiler import get_policy return get_policy(platform) except Exception as e: print(f"Warning: Failed to load policy for {platform}: {e}") # Return a minimal fallback policy from models import PlatformPolicy return PlatformPolicy( platform=platform, threshold=0.35, # Default threshold base_rate=0.005, fn_cost_signal="high", fp_cost_signal="medium", harm_weight=1.0, primary_enforcement_signal="photo_reuse", fp_penalty_weight=0.5, sources=[], confidence=0.0, compiled_at="" )