TheAarvee05's picture
Upload meta_ads_env/simulator.py with huggingface_hub
11911b4 verified
"""Campaign simulation engine with delayed causal attribution dynamics."""
from __future__ import annotations
import random
from typing import Dict, List, Tuple
from meta_ads_env.models import (
Action,
AdSetMetrics,
CampaignData,
EnvState,
PendingConversion,
Reward,
RewardComponents,
)
# ─── Attribution window coverage multipliers ─────────────────────────────────
# Fraction of true conversions that fall inside each window (industry averages)
WINDOW_COVERAGE: Dict[str, float] = {
"1d_click": 0.30,
"7d_click": 0.78,
"7d_click_1d_view": 0.86,
"28d_click": 0.92,
"1d_view": 0.20,
}
WINDOW_DAYS: Dict[str, int] = {
"1d_click": 1,
"7d_click": 7,
"7d_click_1d_view": 7,
"28d_click": 28,
"1d_view": 1,
}
SEGMENT_PARAMS: Dict[str, Dict[str, float]] = {
"retargeting": {"ctr": 0.027, "cvr": 0.11, "imp_per_usd": 780.0},
"lookalike_1pct": {"ctr": 0.019, "cvr": 0.068, "imp_per_usd": 980.0},
"lookalike_2pct": {"ctr": 0.017, "cvr": 0.056, "imp_per_usd": 1010.0},
"broad_interest": {"ctr": 0.012, "cvr": 0.020, "imp_per_usd": 1120.0},
"interest": {"ctr": 0.013, "cvr": 0.028, "imp_per_usd": 1080.0},
}
SEGMENT_DECAY: Dict[str, float] = {
"retargeting": 0.006,
"lookalike_1pct": 0.008,
"lookalike_2pct": 0.009,
"broad_interest": 0.014,
"interest": 0.012,
}
# ─── Pixel signal quality sources ────────────────────────────────────────────
# Each active mitigation adds signal recovery
def compute_pixel_quality(
ios_traffic_pct: float,
conversions_api: bool,
aem_enabled: bool,
utm_tracking: bool,
) -> float:
"""
Base signal quality degrades with iOS traffic.
Each mitigation partially recovers it.
Returns quality in [0.0, 1.0].
"""
base = 1.0 - (ios_traffic_pct * 0.70) # iOS wipes up to 70% of pixel signal
base = max(base, 0.15) # pixel alone floors at 15%
recovery = 0.0
if conversions_api:
recovery += 0.30 # CAPI is the biggest single recovery lever
if aem_enabled:
recovery += 0.15 # AEM adds modelled conversions on top
if utm_tracking:
recovery += 0.05 # UTM helps with analytics cross-check
quality = min(base + recovery, 1.0)
return round(quality, 4)
def compute_server_signal_quality(
conversions_api: bool,
aem_enabled: bool,
utm_tracking: bool,
) -> float:
quality = 0.05
if conversions_api:
quality += 0.55
if aem_enabled:
quality += 0.18
if utm_tracking:
quality += 0.08
return round(min(quality, 0.95), 4)
def compute_attribution_confidence(
pixel_match_quality: float,
capi_coverage: float,
ios_traffic_pct: float,
) -> float:
ios_component = 1.0 - min(max(ios_traffic_pct * 100.0, 0.0), 100.0) / 100.0
confidence = (pixel_match_quality * 0.4) + (capi_coverage * 0.4) + (ios_component * 0.2)
return round(min(max(confidence, 0.0), 1.0), 4)
def compute_tracking_reliability(campaign: CampaignData, investigation_level: float) -> float:
# Pixel degrades with iOS while server-side partially recovers signal.
pixel_weight = 0.70
server_weight = 0.30
base = campaign.pixel_signal_quality * pixel_weight + campaign.server_signal_quality * server_weight
base = (base * 0.80) + (campaign.capi_coverage * 0.20)
recovery = min(investigation_level, 1.0) * 0.18
return round(min(base + recovery, 0.98), 4)
def compute_reported_conversions(
true_conversions: int,
attribution_window: str,
pixel_quality: float,
) -> int:
"""
Applies window coverage and pixel signal degradation to ground-truth conversions
to produce what Meta Ads Manager actually reports.
"""
window_factor = WINDOW_COVERAGE.get(attribution_window, 0.72)
reported = int(true_conversions * window_factor * pixel_quality)
return max(reported, 0)
def compute_roas(conversions: int, avg_order_value: float, spend: float) -> float:
if spend <= 0:
return 0.0
revenue = conversions * avg_order_value
return round(revenue / spend, 3)
# ─── Ad-set helpers ──────────────────────────────────────────────────────────
def build_adsets(campaign: CampaignData, avg_order_value: float, seed: int = 42) -> List[AdSetMetrics]:
# Kept for backward compatibility for external callers.
_ = (avg_order_value, seed)
return campaign.adsets
# ─── Action application ──────────────────────────────────────────────────────
class SimulationEngine:
"""
Applies an Action to an EnvState and returns
(new_state, reward, done, info).
"""
def __init__(self, seed: int = 42):
self.seed = seed
self.rng = random.Random(seed)
def apply(
self,
state: EnvState,
action: Action,
avg_order_value: float = 75.0,
) -> Tuple[EnvState, Reward, bool, Dict]:
new_state = state.model_copy(deep=True)
c = new_state.campaign
reward_components = RewardComponents()
info: Dict = {"action_applied": action.action_type, "effects": []}
# ── Snapshot before ──────────────────────────────────────────────
before_gap = _attribution_gap(c)
before_signal = state.tracking_reliability
before_roas = c.reported_roas
before_momentum = state.growth_momentum
before_issue_fraction = _issue_resolution_fraction(state)
converged_before = _is_converged(state)
# ── Apply action ─────────────────────────────────────────────────
valid = True
action_count = new_state.action_counts.get(action.action_type, 0) + 1
new_state.action_counts[action.action_type] = action_count
prev_action = new_state.last_action_type or (new_state.history[-1]["action"] if new_state.history else "")
prev2_action = new_state.history[-2]["action"] if len(new_state.history) >= 2 else ""
same_as_previous = bool(action.action_type) and (action.action_type == prev_action)
if same_as_previous:
repeat_count = new_state.repeated_action_count + 1
else:
repeat_count = 1
new_state.last_action_type = action.action_type
new_state.repeated_action_count = repeat_count
if action.action_type != "no_op":
new_state.easy_meaningful_actions_taken += 1
diminishing = _diminishing_returns(action_count)
effect_scale = self.rng.uniform(0.90, 1.10)
timing_bonus = 0.0
uncertainty_bonus = 0.0
delayed_release_bonus = 0.0
stable_stack = _is_stack_stable(new_state)
if action.action_type == "promote_ad":
promote_ready = (
stable_stack
and new_state.tracking_reliability >= 0.80
and _attribution_gap(c) <= 0.18
and new_state.tracking_investigated
)
if not promote_ready:
valid = False
timing_bonus = -0.18
uncertainty_bonus = min(uncertainty_bonus, -0.04)
new_state.growth_momentum = max(new_state.growth_momentum - 0.05, 0.45)
info["effects"].append("Promotion blocked: system not yet optimized")
else:
lift = 0.18 * diminishing
if new_state.tracking_reliability < 0.72:
# Risky scaling under low confidence can backfire later.
lift *= 0.35
timing_bonus -= 0.12
new_state.risk_events.append("early_scale_risk")
info["effects"].append("Risk event: early scaling under low tracking confidence")
new_state.budget_optimization_multiplier = max(new_state.budget_optimization_multiplier - 0.10, 0.80)
if not new_state.tracking_investigated:
lift *= 0.30
timing_bonus -= 0.12
uncertainty_bonus -= 0.08
new_state.risk_events.append("promote_before_tracking_fix")
info["effects"].append("Promotion before tracking fix reduced future conversion quality")
if new_state.tracking_reliability < 0.60:
valid = False
lift = min(lift, 0.0)
new_state.growth_momentum = max(new_state.growth_momentum - 0.08, 0.40)
info["effects"].append("Promotion under low signal quality caused negative lift")
new_state.growth_momentum = min(new_state.growth_momentum + lift, 1.8)
info["effects"].append(f"Promotion lift applied (+{lift:.2f} momentum)")
timing_bonus = 0.12
elif action.action_type == "reduce_budget":
scale = action.parameters.get("scale", 0.85)
scale = min(max(float(scale), 0.60), 0.98)
for ads in c.adsets:
if not ads.is_paused:
ads.budget = round(ads.budget * scale, 2)
new_state.growth_momentum = max(new_state.growth_momentum - 0.12 * diminishing, 0.55)
info["effects"].append(f"Budgets scaled by {scale:.2f}; growth momentum reduced")
timing_bonus = -0.03 if new_state.day <= 2 else 0.04
if (not new_state.tracking_investigated) and new_state.day <= 2:
new_state.early_wrong_decision = True
timing_bonus -= 0.08
info["effects"].append("Wrong early decision: budget reduced before attribution diagnosis")
elif action.action_type == "investigate_attribution":
if new_state.tracking_investigated and not new_state.uncertainty_reintroduced:
valid = False
uncertainty_bonus = -0.06
info["effects"].append("Redundant investigation (no new uncertainty)")
else:
gain = 0.22 * diminishing * effect_scale
new_state.attribution_investigation_level = min(
new_state.attribution_investigation_level + gain, 1.0
)
new_state.tracking_investigated = True
new_state.uncertainty_reintroduced = False
info["effects"].append(f"Attribution investigation depth +{gain:.2f}")
uncertainty_bonus = 0.09
if "tracking_investigated" not in new_state.issues_resolved:
new_state.issues_resolved.append("tracking_investigated")
if new_state.early_wrong_decision:
new_state.recovered_after_wrong_decision = True
info["effects"].append("Recovery path activated after early budget misstep")
elif action.action_type == "switch_to_modeled_conversions":
if c.attribution_reporting_mode == "modeled":
valid = False
uncertainty_bonus = -0.03
info["effects"].append("Modeled reporting already active")
else:
c.modeled_conversions_enabled = True
c.attribution_reporting_mode = "modeled"
new_state.growth_momentum = min(new_state.growth_momentum + (0.06 * diminishing * effect_scale), 1.9)
recovered_total = new_state.tracked_conversions_total + new_state.modeled_conversions_total
if recovered_total > 0:
target_modeled_share = 0.30 if c.aem_enabled else 0.22
desired_modeled = int(round(recovered_total * target_modeled_share))
reclassify = min(new_state.tracked_conversions_total, max(desired_modeled - new_state.modeled_conversions_total, 0))
if reclassify > 0:
new_state.tracked_conversions_total -= reclassify
new_state.modeled_conversions_total += reclassify
info["effects"].append(f"Reclassified {reclassify} recovered conversions into modeled bucket")
info["effects"].append("Reporting switched to modeled conversions")
uncertainty_bonus = 0.06
if (not c.conversions_api_enabled) or (not c.aem_enabled) or (not new_state.tracking_investigated):
# Premature modeled mode leads to noisier optimization and weaker signal trust.
timing_bonus -= 0.18
uncertainty_bonus -= 0.12
new_state.growth_momentum = max(new_state.growth_momentum - 0.06, 0.55)
info["effects"].append("Modeled reporting switched too early; quality penalty applied")
if not c.aem_enabled:
uncertainty_bonus -= 0.12
new_state.risk_events.append("modeled_before_aem")
info["effects"].append("Modeled reporting before AEM introduced noisy signal")
if "modeled_reporting" not in new_state.issues_resolved:
new_state.issues_resolved.append("modeled_reporting")
if action.action_type == "adjust_attribution_window":
window = action.parameters.get("window", "7d_click")
if window in WINDOW_COVERAGE and window != c.attribution_window:
c.attribution_window = window
new_state.growth_momentum = min(new_state.growth_momentum + (0.05 * diminishing * effect_scale), 1.9)
info["effects"].append(f"Attribution window changed to {window}")
if "attribution_window" not in new_state.issues_resolved:
new_state.issues_resolved.append("attribution_window")
else:
valid = False
info["effects"].append("Invalid or unchanged attribution window")
elif action.action_type == "enable_conversions_api":
if not c.conversions_api_enabled:
c.conversions_api_enabled = True
new_state.growth_momentum = min(new_state.growth_momentum + (0.08 * diminishing * effect_scale), 1.9)
info["effects"].append("Conversions API enabled")
if "conversions_api" not in new_state.issues_resolved:
new_state.issues_resolved.append("conversions_api")
else:
valid = False
info["effects"].append("Conversions API already enabled")
elif action.action_type == "enable_aggregated_event_measurement":
if not c.aem_enabled:
c.aem_enabled = True
new_state.growth_momentum = min(new_state.growth_momentum + (0.07 * diminishing * effect_scale), 1.9)
info["effects"].append("AEM enabled")
if "aem" not in new_state.issues_resolved:
new_state.issues_resolved.append("aem")
else:
valid = False
info["effects"].append("AEM already enabled")
elif action.action_type == "add_utm_tracking":
if not c.utm_tracking:
c.utm_tracking = True
new_state.growth_momentum = min(new_state.growth_momentum + (0.04 * diminishing * effect_scale), 1.9)
info["effects"].append("UTM parameters added")
if "utm_tracking" not in new_state.issues_resolved:
new_state.issues_resolved.append("utm_tracking")
else:
valid = False
info["effects"].append("UTM tracking already enabled")
elif action.action_type == "adjust_budget_allocation":
shifts = action.parameters.get("shifts", {})
moved_any = False
for adset_id, new_budget in shifts.items():
for ads in c.adsets:
if ads.adset_id == adset_id and not ads.is_paused:
ads.budget = max(0.0, float(new_budget))
moved_any = True
info["effects"].append(f"Budget for {adset_id} β†’ ${new_budget}")
if moved_any and "budget_allocation" not in new_state.issues_resolved:
new_state.issues_resolved.append("budget_allocation")
if not moved_any:
valid = False
if "paused_bad_adsets" in new_state.issues_remaining and "paused_bad_adsets" not in new_state.issues_resolved:
timing_bonus -= 0.05
if moved_any and ((not new_state.tracking_investigated) or (not c.conversions_api_enabled)):
timing_bonus -= 0.18
new_state.growth_momentum = max(new_state.growth_momentum - 0.05, 0.55)
new_state.risk_events.append("premature_budget_shift")
info["effects"].append("Budget shift before attribution fixes reduced future efficiency")
if moved_any:
# Budget mix changes compound over future steps once tracking quality is usable.
compounding_gain = 0.14 if new_state.tracking_investigated else 0.08
dependency_scale = 1.0 if (new_state.tracking_investigated and c.conversions_api_enabled) else 0.35
new_state.budget_optimization_multiplier = min(
new_state.budget_optimization_multiplier + (compounding_gain * diminishing * dependency_scale * effect_scale),
1.95,
)
elif action.action_type == "pause_underperforming_adsets":
threshold = action.parameters.get("roas_threshold", 1.0)
paused = []
wasted_spend_cut = 0.0
for ads in c.adsets:
# Only pause when both observed and true performance are weak.
reported_cutoff = max(float(threshold) - 0.25, 0.80)
if ads.true_roas < float(threshold) and ads.reported_roas < reported_cutoff and not ads.is_paused:
ads.is_paused = True
paused.append(ads.adset_id)
wasted_spend_cut += ads.spent * 0.06
info["effects"].append(f"Paused adsets: {paused}")
if paused and "paused_bad_adsets" not in new_state.issues_resolved:
new_state.issues_resolved.append("paused_bad_adsets")
if paused:
momentum_gain = min(0.04 + (0.02 * len(paused)), 0.12)
new_state.growth_momentum = min(new_state.growth_momentum + momentum_gain, 1.9)
c.budget_spent = max(c.budget_spent - wasted_spend_cut, 0.0)
dependency_scale = 1.0 if (new_state.tracking_investigated and c.conversions_api_enabled) else 0.30
new_state.budget_optimization_multiplier = min(
new_state.budget_optimization_multiplier + (0.16 * diminishing * dependency_scale * effect_scale),
1.95,
)
info["effects"].append(f"Waste reduction applied (${wasted_spend_cut:.0f})")
if not paused:
valid = False
elif action.action_type == "reallocate_to_top_performers":
active = [a for a in c.adsets if not a.is_paused]
if len(active) < 2:
valid = False
info["effects"].append("Not enough active adsets to reallocate")
else:
top = max(active, key=lambda a: a.true_roas)
low = min(active, key=lambda a: a.true_roas)
realloc_amt = action.parameters.get("amount", 500.0)
realloc_amt = float(realloc_amt)
donor_amt = min(realloc_amt, max(low.budget * 0.4, 0.0))
low.budget = max(0.0, low.budget - donor_amt)
top.budget += donor_amt
# Reallocation has delayed compounding impact on future conversion potential.
new_state.growth_momentum = min(new_state.growth_momentum + (0.09 * diminishing), 1.9)
info["effects"].append(f"Reallocated ${realloc_amt} to {top.adset_id}")
if "budget_reallocation" not in new_state.issues_resolved:
new_state.issues_resolved.append("budget_reallocation")
if "budget_allocation" not in new_state.issues_resolved:
new_state.issues_resolved.append("budget_allocation")
if "paused_bad_adsets" in new_state.issues_remaining and "paused_bad_adsets" not in new_state.issues_resolved:
timing_bonus -= 0.05
if (not new_state.tracking_investigated) or (not c.conversions_api_enabled):
timing_bonus -= 0.20
new_state.growth_momentum = max(new_state.growth_momentum - 0.06, 0.55)
new_state.risk_events.append("premature_reallocation")
info["effects"].append("Premature reallocation penalty: attribution stack not ready")
else:
new_state.budget_optimization_multiplier = min(
new_state.budget_optimization_multiplier + (0.24 * diminishing * effect_scale),
1.95,
)
elif action.action_type == "change_bid_strategy":
strategy = action.parameters.get("strategy", "lowest_cost")
info["effects"].append(f"Bid strategy β†’ {strategy}")
if "bid_strategy" not in new_state.issues_resolved:
new_state.issues_resolved.append("bid_strategy")
elif action.action_type == "segment_audience":
info["effects"].append("Audience segmentation applied")
if "audience_segmentation" not in new_state.issues_resolved:
new_state.issues_resolved.append("audience_segmentation")
elif action.action_type == "no_op":
info["effects"].append("No action taken")
valid = False
if (not new_state.tracking_investigated) and action.action_type in {
"promote_ad",
"adjust_budget_allocation",
"reallocate_to_top_performers",
"switch_to_modeled_conversions",
}:
timing_bonus -= 0.08
uncertainty_bonus -= 0.04
if (not new_state.tracking_investigated) and (new_state.day <= 2) and action.action_type != "investigate_attribution":
timing_bonus -= 0.07
uncertainty_bonus -= 0.06
new_state.attribution_confidence = max(new_state.attribution_confidence - 0.05, 0.0)
new_state.confidence_score = max(new_state.confidence_score - 0.06, 0.0)
info["effects"].append("Skipped investigation reduced diagnostic confidence")
# Confidence estimate drives risk penalties when acting under poor observability.
unresolved = len(set(new_state.issues_remaining) - set(new_state.issues_resolved))
c.capi_coverage = min(max(c.capi_coverage + (0.28 if c.conversions_api_enabled else -0.03), 0.0), 1.0)
c.pixel_match_quality = min(max((0.70 * c.pixel_signal_quality) + (0.30 * new_state.tracking_reliability), 0.0), 1.0)
new_state.attribution_confidence = compute_attribution_confidence(
pixel_match_quality=c.pixel_match_quality,
capi_coverage=c.capi_coverage,
ios_traffic_pct=c.ios_traffic_pct,
)
new_state.confidence_score = min(
max(
(0.80 * new_state.attribution_confidence)
+ (0.15 * (1.0 - _attribution_gap(c)))
+ (0.05 * (1.0 - min(unresolved / 8.0, 1.0))),
0.0,
),
1.0,
)
if new_state.confidence_score < 0.55 and action.action_type in {"promote_ad", "reallocate_to_top_performers", "adjust_budget_allocation"}:
timing_bonus -= 0.06
uncertainty_bonus -= 0.05
new_state.risk_events.append("low_confidence_risk_penalty")
# Long-term state updates from tracking stack.
c.pixel_signal_quality = compute_pixel_quality(
c.ios_traffic_pct,
c.conversions_api_enabled,
c.aem_enabled,
c.utm_tracking,
)
c.server_signal_quality = compute_server_signal_quality(
c.conversions_api_enabled,
c.aem_enabled,
c.utm_tracking,
)
new_state.tracking_reliability = compute_tracking_reliability(
c,
new_state.attribution_investigation_level,
)
new_state.tracking_reliability = min(
max(new_state.tracking_reliability + self.rng.uniform(-0.03, 0.03), 0.15),
0.99,
)
# Causal daily transition: impressions -> clicks -> delayed conversions -> observed signals.
risk_events_before = len(new_state.risk_events)
self._simulate_day(new_state, avg_order_value)
if len(new_state.risk_events) > risk_events_before:
new_events = new_state.risk_events[risk_events_before:]
if new_events:
info["risk_event"] = new_events[-1]
info["effects"].append(f"Risk event observed: {new_events[-1]}")
# ── Compute reward ────────────────────────────────────────────────
after_gap = _attribution_gap(c)
after_signal = new_state.tracking_reliability
after_roas = c.reported_roas
after_momentum = new_state.growth_momentum
gap_delta = max(before_gap - after_gap, 0.0)
roas_delta = after_roas - before_roas
sig_delta = max(after_signal - before_signal, 0.0)
momentum_delta = after_momentum - before_momentum
issue_progress = max(_issue_resolution_fraction(new_state) - before_issue_fraction, 0.0)
early_factor = max(0.2, 1.0 - (new_state.day / max(new_state.max_steps, 1)))
delayed_recovery = min(new_state.delayed_conversion_release_last_step / max(new_state.max_released_conversions_per_step, 1), 1.0)
reward_components.attribution_accuracy = round(min(gap_delta * (0.80 + 0.32 * early_factor), 0.58), 4)
reward_components.roas_improvement = round(max(min(roas_delta * 0.12, 0.30), -0.18), 4)
reward_components.signal_quality_gain = round(min(sig_delta * 0.60, 0.25), 4)
reward_components.action_validity = 0.10 if valid else -0.06
reward_components.step_efficiency = 0.05 if new_state.step_count <= new_state.optimal_steps_hint else -0.03
reward_components.timing_quality = round(timing_bonus + (0.08 * delayed_recovery), 4)
reward_components.uncertainty_handling = round(uncertainty_bonus, 4)
reward_components.long_term_gain = round(max(min((momentum_delta * 0.26) + (0.08 * delayed_recovery), 0.14), -0.10), 4)
reward_components.issue_resolution_progress = round(min(issue_progress * 0.18, 0.12), 4)
reward_components.redundancy_penalty = round(_redundancy_penalty(action.action_type, action_count), 4)
repetition_penalty = -0.05 * repeat_count
if repeat_count >= 3:
repetition_penalty -= 0.15
reward_components.redundancy_penalty = round(
reward_components.redundancy_penalty + repetition_penalty,
4,
)
# Penalize redundant actions with stronger recency awareness (last 2 steps).
recent_redundancy = 0.0
if action.action_type in {prev_action, prev2_action} and action.action_type:
recent_redundancy -= 0.03
reward_components.redundancy_penalty = round(
reward_components.redundancy_penalty + recent_redundancy,
4,
)
# Encourage action diversity until convergence is reached.
if (not converged_before) and action.action_type not in {prev_action, prev2_action}:
reward_components.uncertainty_handling = round(reward_components.uncertainty_handling + 0.02, 4)
if new_state.confidence_score < 0.50 and action.action_type not in {"investigate_attribution", "no_op"}:
reward_components.uncertainty_handling = round(reward_components.uncertainty_handling - 0.05, 4)
reward_components.redundancy_penalty = round(reward_components.redundancy_penalty - 0.04, 4)
if valid and gap_delta < 0.005 and sig_delta < 0.005 and delayed_recovery < 0.10:
reward_components.redundancy_penalty = round(reward_components.redundancy_penalty - 0.06, 4)
if action.action_type == "adjust_attribution_window" and new_state.difficulty == "easy":
reward_components.timing_quality = round(reward_components.timing_quality - 0.04, 4)
if new_state.day <= 2 and not new_state.tracking_investigated and action.action_type != "investigate_attribution":
reward_components.redundancy_penalty = round(reward_components.redundancy_penalty - 0.08, 4)
if action.action_type == "promote_ad" and not stable_stack:
reward_components.redundancy_penalty = round(reward_components.redundancy_penalty - 0.04, 4)
if action.action_type == "promote_ad" and action_count > 2:
reward_components.redundancy_penalty = round(reward_components.redundancy_penalty - min(0.02 * (action_count - 2), 0.08), 4)
if action.action_type == "promote_ad" and roas_delta < 0.08:
reward_components.redundancy_penalty = round(reward_components.redundancy_penalty - 0.04, 4)
if _all_issues_resolved(new_state) and action.action_type != "no_op":
reward_components.redundancy_penalty = round(reward_components.redundancy_penalty - 0.06, 4)
if "paused_bad_adsets" in new_state.issues_remaining and "paused_bad_adsets" not in new_state.issues_resolved:
if action.action_type in {"promote_ad", "reallocate_to_top_performers"}:
reward_components.redundancy_penalty = round(reward_components.redundancy_penalty - 0.05, 4)
if new_state.day >= 6 and "paused_bad_adsets" in new_state.issues_remaining and "paused_bad_adsets" not in new_state.issues_resolved:
if action.action_type != "pause_underperforming_adsets":
reward_components.redundancy_penalty = round(reward_components.redundancy_penalty - 0.04, 4)
if converged_before and action.action_type != "no_op":
reward_components.redundancy_penalty = round(reward_components.redundancy_penalty - 0.08, 4)
reward_components.timing_quality = round(reward_components.timing_quality - 0.08, 4)
if new_state.day >= (new_state.max_steps - 2) and action.action_type in {"add_utm_tracking", "segment_audience", "change_bid_strategy"}:
reward_components.redundancy_penalty = round(reward_components.redundancy_penalty - 0.05, 4)
ordering_component = _ordering_bonus(new_state, action.action_type) * self.rng.uniform(0.85, 1.15)
if valid and (gap_delta > 0.008 or sig_delta > 0.008 or roas_delta > 0.04):
ordering_component += 0.01
reward_components.timing_quality = round(
reward_components.timing_quality + ordering_component + delayed_release_bonus + (0.03 * early_factor),
4,
)
reward_scale = self.rng.uniform(0.95, 1.05)
immediate_reward = (
(0.45 * reward_components.action_validity)
+ (0.45 * reward_components.step_efficiency)
+ (0.50 * reward_components.timing_quality)
+ (0.35 * reward_components.uncertainty_handling)
+ (0.60 * reward_components.redundancy_penalty)
)
step_penalty = 0.03
immediate_reward -= step_penalty
immediate_reward -= (0.05 * repeat_count)
if repeat_count >= 3:
immediate_reward -= 0.15
info["effects"].append("Hard repetition penalty applied")
delayed_credit = (
(0.80 * reward_components.attribution_accuracy)
+ (0.80 * reward_components.roas_improvement)
+ (0.70 * reward_components.signal_quality_gain)
+ (0.60 * reward_components.long_term_gain)
+ (0.55 * reward_components.issue_resolution_progress)
)
new_state.delayed_reward_buffer = round(new_state.delayed_reward_buffer + delayed_credit, 6)
delayed_reward = max(min(new_state.delayed_reward_buffer * 0.45, 0.70), -0.40)
new_state.delayed_reward_buffer = round(new_state.delayed_reward_buffer - delayed_reward, 6)
new_state.delayed_reward_released_last_step = round(delayed_reward, 4)
terminal_bonus = 0.0
done_candidate = (
(new_state.step_count + 1) >= new_state.max_steps
or (_all_issues_resolved(new_state) and (new_state.day + 1) >= 2)
or _is_converged(new_state)
)
if _major_issues_resolved(new_state) and _is_near_optimal(new_state):
done_candidate = True
info["effects"].append("Episode terminated: major issues resolved and system near optimal")
overall_delta = gap_delta + sig_delta + max(roas_delta, 0.0) * 0.20
convergence_delta_threshold = 0.008
if overall_delta < convergence_delta_threshold:
new_state.convergence_stagnation_count += 1
else:
new_state.convergence_stagnation_count = 0
if (new_state.day + 1) >= 3 and new_state.convergence_stagnation_count >= 2:
done_candidate = True
info["effects"].append("Episode terminated on negligible improvement convergence")
elif (new_state.day + 1) >= 3 and new_state.convergence_stagnation_count >= 1 and _metrics_stable(new_state):
if self.rng.random() < 0.40:
done_candidate = True
info["effects"].append("Episode terminated on soft convergence")
overrun_steps = max((new_state.step_count + 1) - new_state.optimal_steps, 0)
if overrun_steps > 0:
extra_efficiency_penalty = min(0.03 * overrun_steps, 0.15)
immediate_reward -= extra_efficiency_penalty
info["effects"].append(f"Efficiency penalty applied ({extra_efficiency_penalty:.2f})")
if converged_before and action.action_type != "no_op":
immediate_reward -= 0.06
if done_candidate:
final_gap_pred = after_gap
roas_gain_pred = max((c.true_roas - state.campaign.true_roas) / max(state.campaign.true_roas, 0.01), 0.0)
signal_pred = new_state.tracking_reliability
efficiency_pred = max(0.0, 1.0 - ((new_state.step_count + 1) / max(new_state.max_steps, 1)))
terminal_bonus = max(
min(
(0.30 * max(1.0 - final_gap_pred, 0.0))
+ (0.30 * min(roas_gain_pred, 1.5))
+ (0.20 * signal_pred)
+ (0.20 * efficiency_pred),
0.90,
),
0.0,
)
if (new_state.step_count + 1) <= 6:
terminal_bonus += 0.08
elif (new_state.step_count + 1) >= (new_state.max_steps - 1):
terminal_bonus -= 0.08
if (new_state.step_count + 1) <= max(new_state.optimal_steps - 1, 1):
terminal_bonus += 0.10
if converged_before and action.action_type != "no_op":
terminal_bonus -= 0.05
terminal_bonus *= 0.80
new_state.terminal_bonus_last_step = round(terminal_bonus, 4)
raw_total = ((immediate_reward * 0.60) + (delayed_reward * 1.20) + (terminal_bonus * 1.30)) * reward_scale
no_effect_action = (gap_delta < 0.003) and (sig_delta < 0.003) and (abs(roas_delta) < 0.03)
if no_effect_action:
raw_total -= 0.05
info["effects"].append("No measurable improvement penalty applied")
if no_effect_action:
raw_total *= 0.40
info["effects"].append("Late-stage negligible-improvement dampening applied")
# Apply diminishing returns on repeated local behavior.
repeat_scale = max(0.55, 1.0 - (0.10 * max(repeat_count - 1, 0)))
raw_total *= repeat_scale
# Repeated ineffective actions should be negative (not neutral).
ineffective_repeat = same_as_previous and (gap_delta <= 0.001) and (sig_delta <= 0.001) and (roas_delta <= 0.0)
if ineffective_repeat:
raw_total -= 0.05
if action.action_type == "promote_ad" and not stable_stack:
raw_total -= 0.12
info["effects"].append("Premature promotion penalty applied")
if new_state.difficulty == "hard" and _all_issues_resolved(new_state):
allowed_post_opt = {"reallocate_to_top_performers", "pause_underperforming_adsets", "adjust_budget_allocation", "no_op"}
if action.action_type not in allowed_post_opt:
raw_total -= 0.06
info["effects"].append("Post-optimization meaningless action penalty applied")
if _is_near_optimal(new_state) and action.action_type != "no_op":
raw_total -= 0.05
info["effects"].append("Late-stage unnecessary action penalty applied")
if new_state.early_wrong_decision and (not new_state.recovered_after_wrong_decision):
raw_total -= 0.03
if new_state.recovered_after_wrong_decision:
raw_total += 0.02
# Medium tasks are intentionally capped to avoid unrealistically high step rewards.
if new_state.difficulty == "medium" and raw_total > 0.5:
raw_total *= 0.8
# Explicit reward normalization for stable learning and predictable penalties.
total = round(min(max(raw_total, -0.2), 1.0), 4)
if action.action_type == "no_op":
solved_state = _all_issues_resolved(new_state) and _is_converged(new_state)
if solved_state:
total = min(total, 0.02)
total = max(total, 0.005)
info["effects"].append("no_op accepted: system already optimized")
else:
inactivity_penalty = -0.02 - min(0.01 * max(repeat_count - 1, 0), 0.03)
total = min(total, inactivity_penalty)
total = max(total, -0.05)
info["effects"].append("no_op inactivity penalty applied")
if action.action_type == "investigate_attribution":
total = max(total, 0.02)
info["effects"].append("Investigate action floor reward applied")
reward = Reward(
total=total,
components=reward_components,
explanation=(
f"day={new_state.day} gap_delta={gap_delta:.2%} signal_delta={sig_delta:.2%} "
f"roas_delta={roas_delta:.2f} momentum={new_state.growth_momentum:.2f} "
f"repeat_count={repeat_count} immediate={immediate_reward:.3f} delayed={delayed_reward:.3f} terminal={terminal_bonus:.3f} reward_scale={reward_scale:.3f} raw_total={raw_total:.3f} confidence={new_state.confidence_score:.2f}"
),
)
# ── Advance step ──────────────────────────────────────────────────
new_state.step_count += 1
new_state.day += 1
new_state.cumulative_reward += total
new_state.attribution_gap_history.append(_attribution_gap(c))
new_state.roas_history.append(c.reported_roas)
new_state.signal_quality_history.append(new_state.tracking_reliability)
new_state.history.append({
"step": new_state.step_count,
"day": new_state.day,
"action": action.action_type,
"reasoning": action.reasoning or "",
"reward": total,
"immediate_reward": round(immediate_reward, 4),
"delayed_reward": round(delayed_reward, 4),
"terminal_bonus": round(terminal_bonus, 4),
"delayed_release": new_state.delayed_conversion_release_last_step,
"tracked_release": new_state.tracked_conversion_release_last_step,
"modeled_release": new_state.modeled_conversion_release_last_step,
"risk_events": [e for e in info["effects"] if "penalty" in e.lower() or "risk" in e.lower() or "reduced" in e.lower()],
"effects": info["effects"],
})
if action.reasoning:
new_state.reasoning_log.append(action.reasoning)
new_state.risk_events.extend([e for e in info["effects"] if "penalty" in e.lower() or "risk" in e.lower() or "reduced" in e.lower()])
# ── Check done ────────────────────────────────────────────────────
min_steps_required = 3 if new_state.difficulty == "easy" else 2
easy_action_gate = (
new_state.difficulty != "easy"
or new_state.easy_meaningful_actions_taken >= 3
)
done = (
new_state.step_count >= new_state.max_steps
or done_candidate
or (_all_issues_resolved(new_state) and new_state.day >= min_steps_required and easy_action_gate)
or (_major_issues_resolved(new_state) and _is_near_optimal(new_state))
or (new_state.convergence_stagnation_count >= 2 and _metrics_stable(new_state))
or (_is_converged(new_state) and easy_action_gate)
)
if done and _is_converged(new_state):
new_state.convergence_reached = True
new_state.cumulative_reward += 0.06
new_state.done = done
return new_state, reward, done, info
def _simulate_day(self, state: EnvState, avg_order_value: float) -> None:
campaign = state.campaign
day_seed = state.day + 1
if not state.episode_risk_initialized:
state.episode_risk_initialized = True
if self.rng.random() < self.rng.uniform(0.10, 0.15):
event = self.rng.choice(["tracking_drop", "modeled_noise", "delay_spike"])
state.risk_events.append(event)
state.episode_rare_events.append(event)
if event == "tracking_drop":
state.tracking_reliability = max(state.tracking_reliability - self.rng.uniform(0.10, 0.20), 0.15)
elif event == "delay_spike":
spike_pool = max(int(state.hidden_conversions_pool * self.rng.uniform(0.10, 0.18)), 6)
if state.campaign.adsets:
source_ids = [a.adset_id for a in state.campaign.adsets if not a.is_paused] or [state.campaign.adsets[0].adset_id]
state.pending_delayed_conversions.append(
PendingConversion(
source_adset_id=self.rng.choice(source_ids),
clicks=spike_pool,
expected_conversions=spike_pool,
value=spike_pool,
delay_days_remaining=2,
original_delay_days=2,
)
)
if state.difficulty == "easy":
reliability_amp = 0.05
delay_jitter_amp = 0.04
elif state.difficulty == "medium":
reliability_amp = 0.08
delay_jitter_amp = 0.06
else:
reliability_amp = 0.10
delay_jitter_amp = 0.08
reliability_noise = self.rng.uniform(-reliability_amp, reliability_amp)
effective_tracking_reliability = min(max(state.tracking_reliability + reliability_noise, 0.20), 0.99)
# Controlled stochasticity: small day-to-day drift and rare disruptive events.
campaign.ios_traffic_pct = min(max(campaign.ios_traffic_pct + self.rng.uniform(-0.01, 0.01), 0.10), 0.85)
if "tracking_drop" not in state.risk_events and self.rng.random() < (0.16 if state.difficulty == "hard" else 0.12):
drop = self.rng.uniform(0.10, 0.16)
effective_tracking_reliability = max(effective_tracking_reliability - drop, 0.15)
state.risk_events.append("tracking_drop")
if not campaign.conversions_api_enabled and self.rng.random() < 0.22:
effective_tracking_reliability = max(effective_tracking_reliability - self.rng.uniform(0.02, 0.06), 0.15)
state.risk_events.append("signal_degradation_event")
state.tracking_reliability = effective_tracking_reliability
daily_impressions = 0
daily_clicks = 0
daily_spend = 0.0
released_total = 0
tracked_release = 0
modeled_release = 0
remaining_generation_cap = _budgeted_step_cap(state, phase="generate")
remaining_release_cap = _budgeted_step_cap(state, phase="release")
min_delay, max_delay = state.scenario_delay_range[0], state.scenario_delay_range[1]
for idx, adset in enumerate(campaign.adsets):
if adset.is_paused:
continue
params = SEGMENT_PARAMS.get(adset.audience_segment, SEGMENT_PARAMS["broad_interest"])
decay = SEGMENT_DECAY.get(adset.audience_segment, 0.010)
age_penalty = max(0.65, 1.0 - (state.day * decay))
remaining_budget = max(campaign.total_budget - campaign.budget_spent - daily_spend, 0.0)
active_count = max(len([a for a in campaign.adsets if not a.is_paused]), 1)
fair_share = remaining_budget / active_count
daily_budget = min(max(adset.budget * 0.08, 0.0), fair_share)
momentum_factor = max(min(state.growth_momentum, 1.8), 0.5)
impressions = int(daily_budget * params["imp_per_usd"] * momentum_factor * age_penalty)
effective_ctr = min(max(params["ctr"] * 1.08 * age_penalty, 0.01), 0.03)
clicks = int(impressions * effective_ctr)
conversion_probability = min(
max(params["cvr"] * age_penalty * (1.0 + 0.08 * (state.growth_momentum - 1.0)), 0.002),
0.22,
)
optimization_quality = 0.86 + (0.40 * state.tracking_reliability)
attribution_learning = 0.90 + (0.26 * WINDOW_COVERAGE.get(campaign.attribution_window, 0.72))
conversion_probability = min(
conversion_probability * optimization_quality * attribution_learning * state.budget_optimization_multiplier,
0.28,
)
if state.campaign.attribution_window == "1d_click" and state.day >= 1:
conversion_probability *= 0.92
if state.day >= 4 and not campaign.conversions_api_enabled:
conversion_probability *= 0.92
if state.day >= 5 and not state.tracking_investigated:
conversion_probability *= 0.90
low_rate, high_rate = state.conversion_rate_range[0], state.conversion_rate_range[1]
calibration_mid = (low_rate + high_rate) * 0.5
conversion_probability = min(max(conversion_probability, low_rate), high_rate)
conversion_probability = 0.65 * conversion_probability + 0.35 * calibration_mid
delay_span = max(max_delay - min_delay + 1, 1)
conversions_by_delay: Dict[int, int] = {}
for click_i in range(clicks):
if remaining_generation_cap <= 0:
break
sample = self.rng.random()
if sample < conversion_probability:
delay_pick = self.rng.random()
delay = min_delay
cumulative = 0.0
weights = []
for d in range(delay_span):
delay_day = min_delay + d
weight_noise = self.rng.uniform(-delay_jitter_amp, delay_jitter_amp)
weights.append(max(_delay_weight(delay_day) + weight_noise, 0.01))
weight_sum = sum(weights)
for d_idx, weight in enumerate(weights):
cumulative += weight / weight_sum
if delay_pick <= cumulative:
delay = min_delay + d_idx
break
if self.rng.random() < 0.45:
jitter = self.rng.choice([-1, 0, 1])
if self.rng.random() < 0.18:
jitter += self.rng.choice([-1, 1])
delay = min(max(delay + jitter, min_delay), max_delay)
conversions_by_delay[delay] = conversions_by_delay.get(delay, 0) + 1
remaining_generation_cap -= 1
if clicks > 80 and not conversions_by_delay and remaining_generation_cap > 0:
fallback_delay = min_delay + ((day_seed + idx) % delay_span)
conversions_by_delay[fallback_delay] = 1
remaining_generation_cap -= 1
for delay, conv_count in conversions_by_delay.items():
state.pending_delayed_conversions.append(
PendingConversion(
source_adset_id=adset.adset_id,
clicks=clicks,
expected_conversions=conv_count,
value=conv_count,
delay_days_remaining=delay,
original_delay_days=delay,
)
)
adset.spent = round(adset.spent + daily_budget, 2)
adset.impressions += impressions
adset.link_clicks += clicks
daily_impressions += impressions
daily_clicks += clicks
daily_spend += daily_budget
campaign.impressions += daily_impressions
campaign.link_clicks += daily_clicks
campaign.budget_spent = round(min(campaign.total_budget, campaign.budget_spent + daily_spend), 2)
matured: List[PendingConversion] = []
remaining: List[PendingConversion] = []
for item in state.pending_delayed_conversions:
updated = item.model_copy(deep=True)
updated.delay_days_remaining -= 1
if updated.delay_days_remaining <= 0:
matured.append(updated)
else:
remaining.append(updated)
state.pending_delayed_conversions = remaining
state.pending_conversions = list(state.pending_delayed_conversions)
for item in matured:
if remaining_release_cap <= 0:
state.pending_delayed_conversions.append(
PendingConversion(
source_adset_id=item.source_adset_id,
clicks=item.clicks,
expected_conversions=item.expected_conversions,
value=item.value,
delay_days_remaining=1,
original_delay_days=item.original_delay_days,
)
)
continue
adset = next((a for a in campaign.adsets if a.adset_id == item.source_adset_id), None)
if adset is None:
continue
true_conv = min(item.expected_conversions, remaining_release_cap)
spillover = max(item.expected_conversions - true_conv, 0)
if spillover > 0:
state.pending_delayed_conversions.append(
PendingConversion(
source_adset_id=item.source_adset_id,
clicks=item.clicks,
expected_conversions=spillover,
value=spillover,
delay_days_remaining=1,
original_delay_days=item.original_delay_days,
)
)
remaining_release_cap -= true_conv
campaign.true_conversions += true_conv
adset.true_conversions += true_conv
state.delayed_true_conversions_total += true_conv
observed, modeled = _materialize_observed_signals(
true_conversions=true_conv,
delay_days=item.original_delay_days,
attribution_window=campaign.attribution_window,
tracking_reliability=effective_tracking_reliability,
modeled_enabled=campaign.modeled_conversions_enabled,
aem_enabled=campaign.aem_enabled,
)
if state.difficulty == "easy" and campaign.attribution_window in {"7d_click", "7d_click_1d_view", "28d_click"}:
easy_boost = 2.80 if state.tracking_investigated else 2.20
observed = min(max(int(round(observed * easy_boost)), observed), max(true_conv - modeled, 0))
if campaign.modeled_conversions_enabled and ("modeled_noise" not in state.risk_events) and self.rng.random() < 0.15:
extra_modeled = int(round(modeled * self.rng.uniform(0.10, 0.22)))
if extra_modeled > 0:
modeled += extra_modeled
state.risk_events.append("modeled_noise")
state.risk_events.append("modeled_overestimation")
if campaign.modeled_conversions_enabled and "modeled_noise" in (state.risk_events + state.episode_rare_events):
distortion = self.rng.uniform(0.95, 1.05)
if not campaign.aem_enabled:
distortion *= self.rng.uniform(0.95, 1.00)
modeled = max(int(round(modeled * distortion)), 0)
campaign.reported_conversions += observed + modeled
adset.reported_conversions += observed + modeled
state.delayed_reported_conversions_total += observed
state.tracked_conversions_total += observed
state.modeled_conversions_total += modeled
hidden = max(true_conv - observed - modeled, 0)
state.hidden_conversions_pool += hidden
if hidden > 0:
state.hidden_delayed_conversions.append(
PendingConversion(
source_adset_id=item.source_adset_id,
clicks=item.clicks,
expected_conversions=hidden,
value=hidden,
delay_days_remaining=0,
original_delay_days=item.original_delay_days,
)
)
released_total += true_conv
tracked_release += observed
modeled_release += modeled
state.delayed_conversion_release_last_step = released_total
state.tracked_conversion_release_last_step = tracked_release
state.modeled_conversion_release_last_step = modeled_release
if state.difficulty == "easy":
hidden_release_cap = max(int(state.max_released_conversions_per_step * 2.00), 60)
else:
hidden_release_cap = min(
max(int(state.max_released_conversions_per_step * 0.75), 12),
max(int(state.max_released_conversions_per_step), 1),
)
newly_visible_hidden = _reveal_currently_visible_hidden_events(
state,
release_cap=max(remaining_release_cap, hidden_release_cap),
)
if newly_visible_hidden > 0:
state.delayed_true_conversions_total += newly_visible_hidden
state.tracked_conversion_release_last_step += newly_visible_hidden
state.uncertainty_reintroduced = (
state.day >= 3
and state.tracking_investigated
and state.tracking_reliability < 0.55
and len(state.pending_delayed_conversions) > 3
and not campaign.modeled_conversions_enabled
)
budget_actions_taken = (
state.action_counts.get("reallocate_to_top_performers", 0)
+ state.action_counts.get("pause_underperforming_adsets", 0)
+ state.action_counts.get("adjust_budget_allocation", 0)
)
if budget_actions_taken > 0 and state.tracking_investigated and campaign.conversions_api_enabled:
compounding = min(0.01 * budget_actions_taken, 0.05)
state.budget_optimization_multiplier = min(state.budget_optimization_multiplier + compounding, 2.10)
elif budget_actions_taken == 0:
state.budget_optimization_multiplier = max(state.budget_optimization_multiplier - 0.01, 0.95)
if not campaign.conversions_api_enabled and state.day >= 2:
state.tracking_reliability = max(state.tracking_reliability - 0.015, 0.20)
if action_decay := state.action_counts.get("promote_ad", 0):
if action_decay > 2:
state.growth_momentum = max(state.growth_momentum - (0.01 * (action_decay - 2)), 0.50)
delayed_recovery_ratio = min(
state.delayed_conversion_release_last_step / max(state.max_released_conversions_per_step, 1),
1.0,
)
attribution_lift = max(WINDOW_COVERAGE.get(campaign.attribution_window, 0.72) - WINDOW_COVERAGE.get("1d_click", 0.30), 0.0)
signal_lift = max(effective_tracking_reliability - 0.45, 0.0)
recovered_value_multiplier = min(1.0 + (0.75 * delayed_recovery_ratio) + (0.55 * attribution_lift) + (0.40 * signal_lift), 2.35)
true_value_multiplier = min(1.0 + (0.45 * delayed_recovery_ratio) + (0.30 * attribution_lift) + (0.25 * signal_lift), 1.85)
reported_value = avg_order_value * recovered_value_multiplier
true_value = avg_order_value * true_value_multiplier
for adset in campaign.adsets:
adset.reported_roas = compute_roas(adset.reported_conversions, reported_value, adset.spent)
adset.true_roas = compute_roas(adset.true_conversions, true_value, adset.spent)
campaign.reported_roas = compute_roas(campaign.reported_conversions, reported_value, campaign.budget_spent)
campaign.true_roas = compute_roas(campaign.true_conversions, true_value, campaign.budget_spent)
campaign.reported_cpa = (
round(campaign.budget_spent / campaign.reported_conversions, 2)
if campaign.reported_conversions > 0
else 9999
)
campaign.true_cpa = (
round(campaign.budget_spent / campaign.true_conversions, 2)
if campaign.true_conversions > 0
else 9999
)
# ─── Helpers ─────────────────────────────────────────────────────────────────
def _attribution_gap(c: CampaignData) -> float:
if c.true_conversions == 0:
return 0.0
return max((c.true_conversions - c.reported_conversions) / c.true_conversions, 0.0)
def _all_issues_resolved(state: EnvState) -> bool:
remaining = set(state.issues_remaining) - set(state.issues_resolved)
return len(remaining) == 0
def _budgeted_step_cap(state: EnvState, phase: str) -> int:
if phase == "generate":
base_cap = max(int(state.max_generated_conversions_per_step), 1)
else:
base_cap = max(int(state.max_released_conversions_per_step), 1)
low_target = max(int(0.6 * state.target_true_conversions), 1)
preferred_high_target = max(int(1.2 * state.target_true_conversions), low_target + 1)
hard_upper_target = max(int(2.0 * state.target_true_conversions), preferred_high_target + 1)
current_true = int(state.campaign.true_conversions)
progress = min(max(current_true / preferred_high_target, 0.0), 1.0)
if current_true >= hard_upper_target:
return 0
if current_true < low_target:
return base_cap
# Taper per-step conversions as trajectory approaches preferred realism bound.
low_progress = low_target / preferred_high_target
taper = 1.0 - ((progress - low_progress) / max(1.0 - low_progress, 1e-6))
taper = min(max(taper, 0.20), 1.0)
return max(int(round(base_cap * taper)), 1)
def _deterministic_bucket(value: int, modulo: int = 1000) -> float:
return (value % modulo) / float(modulo)
def _deterministic_noise(seed: int, amplitude: float) -> float:
return (_deterministic_bucket(seed, 10000) - 0.5) * 2.0 * amplitude
def _delay_weight(delay_days: int) -> float:
if delay_days <= 3:
return 0.60
if delay_days <= 5:
return 0.28
return 0.12
def _issue_resolution_fraction(state: EnvState) -> float:
required = set(state.issues_remaining)
if not required:
return 1.0
solved = len(required & set(state.issues_resolved))
return solved / len(required)
def _materialize_observed_signals(
true_conversions: int,
delay_days: int,
attribution_window: str,
tracking_reliability: float,
modeled_enabled: bool,
aem_enabled: bool,
) -> Tuple[int, int]:
window_days = WINDOW_DAYS.get(attribution_window, 7)
if delay_days > window_days:
observed_base = 0
else:
observed_base = int(round(true_conversions * WINDOW_COVERAGE.get(attribution_window, 0.72)))
observed = int(round(observed_base * tracking_reliability))
unattributed = max(true_conversions - observed, 0)
modeled = 0
if modeled_enabled:
modeled_factor = 0.42 + (0.12 if aem_enabled else 0.0)
modeled = int(round(unattributed * modeled_factor))
# Keep modeled share meaningful once modeled reporting is active.
total_recovered = observed + modeled
if total_recovered > 0:
target_modeled_share = 0.30 if aem_enabled else 0.22
required_modeled = int(round(total_recovered * target_modeled_share))
if modeled < required_modeled:
shift = min(observed, required_modeled - modeled)
modeled += shift
observed -= shift
return max(observed, 0), max(modeled, 0)
def _diminishing_returns(action_count: int) -> float:
if action_count <= 1:
return 1.0
if action_count == 2:
return 0.62
if action_count == 3:
return 0.38
if action_count == 4:
return 0.16
if action_count == 5:
return 0.02
return -0.08
def _redundancy_penalty(action_type: str, action_count: int) -> float:
if action_type == "no_op":
return -0.05
if action_count <= 1:
return 0.0
return -min(0.02 * (action_count - 1), 0.12)
def _is_stack_stable(state: EnvState) -> bool:
c = state.campaign
return (
c.attribution_window in {"7d_click", "7d_click_1d_view", "28d_click"}
and c.conversions_api_enabled
and c.aem_enabled
and c.attribution_reporting_mode == "modeled"
and "paused_bad_adsets" in state.issues_resolved
)
def _is_converged(state: EnvState) -> bool:
c = state.campaign
major_remaining = {"attribution_window", "conversions_api", "aem", "paused_bad_adsets", "budget_allocation"}
unresolved_major = major_remaining - set(state.issues_resolved)
roas_stable = True
if len(state.roas_history) >= 3:
tail = state.roas_history[-3:]
roas_stable = (max(tail) - min(tail)) <= 0.12
return (
_attribution_gap(c) < 0.10
and state.tracking_reliability >= 0.92
and roas_stable
and len(unresolved_major) == 0
)
def _major_issues_resolved(state: EnvState) -> bool:
major = {"attribution_window", "conversions_api", "aem", "paused_bad_adsets", "budget_allocation"}
return len(major - set(state.issues_resolved)) == 0
def _is_near_optimal(state: EnvState) -> bool:
c = state.campaign
return (
_attribution_gap(c) < 0.12
and state.tracking_reliability >= 0.90
and c.reported_roas >= 1.9
and _metrics_stable(state)
)
def _metrics_stable(state: EnvState) -> bool:
if len(state.roas_history) < 2 or len(state.signal_quality_history) < 2:
return False
roas_delta = abs(state.roas_history[-1] - state.roas_history[-2])
sig_delta = abs(state.signal_quality_history[-1] - state.signal_quality_history[-2])
if len(state.attribution_gap_history) >= 2:
gap_delta = abs(state.attribution_gap_history[-1] - state.attribution_gap_history[-2])
else:
gap_delta = 0.02
return roas_delta < 0.03 and sig_delta < 0.01 and gap_delta < 0.01
def _ordering_bonus(state: EnvState, action_type: str) -> float:
# Multiple valid paths: A is ideal, B is strong alternative, C is risky but recoverable.
strategy_paths = [
(
[
"investigate_attribution",
"adjust_attribution_window",
"enable_conversions_api",
"enable_aggregated_event_measurement",
"switch_to_modeled_conversions",
"pause_underperforming_adsets",
"reallocate_to_top_performers",
"promote_ad",
],
0.016,
),
(
[
"enable_conversions_api",
"enable_aggregated_event_measurement",
"switch_to_modeled_conversions",
"investigate_attribution",
"adjust_attribution_window",
"pause_underperforming_adsets",
"reallocate_to_top_performers",
"promote_ad",
],
0.012,
),
(
[
"investigate_attribution",
"promote_ad",
"adjust_attribution_window",
"enable_conversions_api",
"enable_aggregated_event_measurement",
"switch_to_modeled_conversions",
"pause_underperforming_adsets",
"reallocate_to_top_performers",
],
0.008,
),
]
if action_type == "no_op":
return -0.01
history_actions = [step.get("action", "") for step in state.history]
best_bonus = -0.02
for path, base_bonus in strategy_paths:
if action_type not in path:
continue
idx = path.index(action_type)
prereq = set(path[:idx])
already = set(history_actions)
matched = len(prereq & already)
missing = max(len(prereq) - matched, 0)
candidate = base_bonus + (0.005 * matched) - (0.014 * missing)
if idx <= 2:
candidate += 0.006
best_bonus = max(best_bonus, candidate)
if action_type in {"promote_ad", "reallocate_to_top_performers", "adjust_budget_allocation"} and not state.tracking_investigated:
best_bonus -= 0.03
return round(max(min(best_bonus, 0.06), -0.08), 4)
def _reveal_currently_visible_hidden_events(state: EnvState, release_cap: int) -> int:
if not state.hidden_delayed_conversions:
return 0
window_days = WINDOW_DAYS.get(state.campaign.attribution_window, 7)
released = 0
remaining_hidden: List[PendingConversion] = []
for evt in state.hidden_delayed_conversions:
if release_cap <= 0:
remaining_hidden.append(evt)
continue
if evt.original_delay_days <= window_days:
observed, modeled = _materialize_observed_signals(
true_conversions=evt.expected_conversions,
delay_days=evt.original_delay_days,
attribution_window=state.campaign.attribution_window,
tracking_reliability=state.tracking_reliability,
modeled_enabled=state.campaign.modeled_conversions_enabled,
aem_enabled=state.campaign.aem_enabled,
)
if state.difficulty == "easy" and state.campaign.attribution_window in {"7d_click", "7d_click_1d_view", "28d_click"}:
easy_boost = 2.60 if state.tracking_investigated else 2.00
observed = min(max(int(round(observed * easy_boost)), observed), max(evt.expected_conversions - modeled, 0))
newly_visible = observed + modeled
if newly_visible > release_cap:
scale = release_cap / max(newly_visible, 1)
observed = int(round(observed * scale))
modeled = min(release_cap - observed, int(round(modeled * scale)))
newly_visible = observed + modeled
if newly_visible > 0:
released += newly_visible
release_cap -= newly_visible
state.campaign.reported_conversions += newly_visible
state.tracked_conversions_total += observed
state.modeled_conversions_total += modeled
state.delayed_reported_conversions_total += observed
state.hidden_conversions_pool = max(state.hidden_conversions_pool - newly_visible, 0)
if state.campaign.adsets:
adset = next((a for a in state.campaign.adsets if a.adset_id == evt.source_adset_id), None)
if adset is not None:
adset.reported_conversions += newly_visible
still_hidden = max(evt.expected_conversions - newly_visible, 0)
if still_hidden > 0:
remaining_hidden.append(
PendingConversion(
source_adset_id=evt.source_adset_id,
clicks=evt.clicks,
expected_conversions=still_hidden,
value=still_hidden,
delay_days_remaining=0,
original_delay_days=evt.original_delay_days,
)
)
else:
remaining_hidden.append(evt)
state.hidden_delayed_conversions = remaining_hidden
return released