"""Strategy Code Generator — produces agent-readable execution plans. Generates structured conditional execution plans that another agent (or automated execution system) can parse and execute: "If market X reaches price Y within Z minutes, then BUY/SELL/CASH OUT" Plans are expressed as a DAG of ConditionalAction nodes with: - Trigger conditions (price, time, volume, sentiment, KK phase) - Actions (buy, sell, hold, hedge, cash_out, vote_yes, vote_no) - Deadlines and expiration - Escalation rules (Q-learner sized) - Causal reasoning (why this action) Output formats: - Structured Python dict (for agent consumption) - JSON (for API/webhook delivery) - Human-readable plan text Design patterns: Builder: ExecutionPlanBuilder for fluent plan construction Strategy: TriggerCondition ABC for different market triggers Chain: ConditionChain links multiple conditions with AND/OR Observer: PlanExecutionTracker monitors plan state machine Neurosymbolic: plans are SYMBOLIC rule structures; triggers are NEURAL KAN signals. """ from __future__ import annotations import json import time import uuid from abc import ABC, abstractmethod from dataclasses import dataclass, field from enum import Enum, auto from typing import Any, Callable, Dict, List, Optional, Tuple from prediction_engine import PHI, PHI_INV # ══════════════════════════════════════════════════════════════════════════════ # 1. ENUMS — action types and trigger conditions # ══════════════════════════════════════════════════════════════════════════════ class ActionType(Enum): BUY_YES = "buy_yes" BUY_NO = "buy_no" SELL_YES = "sell_yes" SELL_NO = "sell_no" HOLD = "hold" HEDGE = "hedge" CASH_OUT = "cash_out" VOTE_YES = "vote_yes" VOTE_NO = "vote_no" SCALE_IN = "scale_in" SCALE_OUT = "scale_out" STOP_LOSS = "stop_loss" TAKE_PROFIT = "take_profit" class TriggerType(Enum): PRICE_ABOVE = "price_above" PRICE_BELOW = "price_below" TIME_AFTER = "time_after" TIME_BEFORE = "time_before" VOLUME_SPIKE = "volume_spike" SENTIMENT_SHIFT = "sentiment_shift" KK_PHASE_TRANSITION = "kk_phase_transition" MOMENTUM_EXHAUSTED = "momentum_exhausted" ARB_DETECTED = "arb_detected" WHALE_MOVE = "whale_move" SPREAD_NARROW = "spread_narrow" SPREAD_WIDEN = "spread_widen" ALWAYS = "always" class CombineLogic(Enum): AND = "and" OR = "or" # ══════════════════════════════════════════════════════════════════════════════ # 2. TRIGGER CONDITIONS — when to fire # ══════════════════════════════════════════════════════════════════════════════ @dataclass class TriggerCondition: """A single condition that must be met for an action to fire.""" trigger_type: TriggerType threshold: float = 0.0 market_id: str = "" venue: str = "" description: str = "" def to_dict(self) -> Dict[str, Any]: return { "type": self.trigger_type.value, "threshold": self.threshold, "market_id": self.market_id, "venue": self.venue, "description": self.description or self._default_description(), } def _default_description(self) -> str: mapping = { TriggerType.PRICE_ABOVE: f"price rises above {self.threshold:.2f}", TriggerType.PRICE_BELOW: f"price drops below {self.threshold:.2f}", TriggerType.TIME_AFTER: f"after timestamp {self.threshold}", TriggerType.TIME_BEFORE: f"before timestamp {self.threshold}", TriggerType.VOLUME_SPIKE: f"volume exceeds {self.threshold:.0f}x normal", TriggerType.SENTIMENT_SHIFT: f"sentiment shifts by {self.threshold:.2f}", TriggerType.KK_PHASE_TRANSITION: "KK phase crosses φ", TriggerType.MOMENTUM_EXHAUSTED: "momentum signal exhausted", TriggerType.ARB_DETECTED: f"arbitrage > {self.threshold:.1%}", TriggerType.WHALE_MOVE: f"whale trade > ${self.threshold:,.0f}", TriggerType.SPREAD_NARROW: f"spread narrows below {self.threshold:.3f}", TriggerType.SPREAD_WIDEN: f"spread widens above {self.threshold:.3f}", TriggerType.ALWAYS: "unconditional", } return mapping.get(self.trigger_type, str(self.trigger_type)) def evaluate(self, market_data: Dict[str, Any]) -> bool: """Evaluate condition against live market data.""" price = market_data.get("price", 0.5) volume = market_data.get("volume_ratio", 1.0) sentiment = market_data.get("sentiment_delta", 0.0) kk_phase = market_data.get("kk_phase", 0.0) spread = market_data.get("spread", 0.0) now = market_data.get("timestamp", time.time()) checks = { TriggerType.PRICE_ABOVE: price > self.threshold, TriggerType.PRICE_BELOW: price < self.threshold, TriggerType.TIME_AFTER: now > self.threshold, TriggerType.TIME_BEFORE: now < self.threshold, TriggerType.VOLUME_SPIKE: volume > self.threshold, TriggerType.SENTIMENT_SHIFT: abs(sentiment) > self.threshold, TriggerType.KK_PHASE_TRANSITION: abs(kk_phase - PHI) < 0.1, TriggerType.MOMENTUM_EXHAUSTED: kk_phase > PHI, TriggerType.ARB_DETECTED: market_data.get("arb_edge", 0) > self.threshold, TriggerType.WHALE_MOVE: market_data.get("whale_size", 0) > self.threshold, TriggerType.SPREAD_NARROW: spread < self.threshold, TriggerType.SPREAD_WIDEN: spread > self.threshold, TriggerType.ALWAYS: True, } return checks.get(self.trigger_type, False) @dataclass class ConditionGroup: """Multiple conditions combined with AND/OR logic.""" conditions: List[TriggerCondition] = field(default_factory=list) logic: CombineLogic = CombineLogic.AND def evaluate(self, market_data: Dict[str, Any]) -> bool: if not self.conditions: return True results = [c.evaluate(market_data) for c in self.conditions] if self.logic == CombineLogic.AND: return all(results) return any(results) def to_dict(self) -> Dict[str, Any]: return { "logic": self.logic.value, "conditions": [c.to_dict() for c in self.conditions], } # ══════════════════════════════════════════════════════════════════════════════ # 3. CONDITIONAL ACTIONS — what to do when triggered # ══════════════════════════════════════════════════════════════════════════════ @dataclass class ConditionalAction: """A single action in an execution plan. If conditions met → execute action with specified sizing. """ action_id: str = field(default_factory=lambda: str(uuid.uuid4())[:8]) action_type: ActionType = ActionType.HOLD conditions: ConditionGroup = field(default_factory=ConditionGroup) market_id: str = "" venue: str = "" size_fraction: float = 0.01 # fraction of bankroll max_size_dollars: float = 1000.0 limit_price: Optional[float] = None expires_at: Optional[float] = None # timestamp priority: int = 0 # higher = execute first causal_reason: str = "" # WHY this action (from causal engine) depends_on: Optional[str] = None # action_id that must fire first cancel_if: Optional[str] = None # action_id that if fired cancels this def to_dict(self) -> Dict[str, Any]: result = { "id": self.action_id, "action": self.action_type.value, "market": self.market_id, "venue": self.venue, "conditions": self.conditions.to_dict(), "sizing": { "fraction": self.size_fraction, "max_dollars": self.max_size_dollars, "limit_price": self.limit_price, }, "priority": self.priority, "reason": self.causal_reason, } if self.expires_at: result["expires_at"] = self.expires_at if self.depends_on: result["depends_on"] = self.depends_on if self.cancel_if: result["cancel_if"] = self.cancel_if return result def to_human_readable(self) -> str: conds = " AND ".join(c._default_description() for c in self.conditions.conditions) if self.conditions.logic == CombineLogic.OR: conds = " OR ".join(c._default_description() for c in self.conditions.conditions) if not conds: conds = "unconditionally" size_desc = f"{self.size_fraction:.0%} of bankroll (max ${self.max_size_dollars:,.0f})" price_desc = f" at limit ${self.limit_price:.2f}" if self.limit_price else "" return ( f"IF {conds} on {self.market_id}:\n" f" → {self.action_type.value.upper()} {size_desc}{price_desc}\n" f" Reason: {self.causal_reason}" ) # ══════════════════════════════════════════════════════════════════════════════ # 4. EXECUTION PLAN — full plan with multiple actions # ══════════════════════════════════════════════════════════════════════════════ @dataclass class ExecutionPlan: """A complete execution plan with conditional actions. Plans are periodic — they evaluate on every market tick and fire actions when conditions are met. """ plan_id: str = field(default_factory=lambda: str(uuid.uuid4())[:12]) name: str = "" description: str = "" actions: List[ConditionalAction] = field(default_factory=list) created_at: float = field(default_factory=time.time) valid_until: Optional[float] = None check_interval_seconds: float = 60.0 # how often to evaluate bankroll: float = 10_000.0 max_total_exposure: float = 0.25 # max 25% of bankroll at risk def add_action(self, action: ConditionalAction) -> "ExecutionPlan": self.actions.append(action) return self def evaluate(self, market_data: Dict[str, Any]) -> List[ConditionalAction]: """Evaluate all actions against current market data. Returns list of actions whose conditions are met. """ now = market_data.get("timestamp", time.time()) if self.valid_until and now > self.valid_until: return [] # plan expired fired_ids = set() pending = sorted(self.actions, key=lambda a: -a.priority) triggered = [] for action in pending: # Check expiration if action.expires_at and now > action.expires_at: continue # Check dependency if action.depends_on and action.depends_on not in fired_ids: continue # Check cancellation if action.cancel_if and action.cancel_if in fired_ids: continue # Evaluate conditions if action.conditions.evaluate(market_data): triggered.append(action) fired_ids.add(action.action_id) return triggered def to_dict(self) -> Dict[str, Any]: return { "plan_id": self.plan_id, "name": self.name, "description": self.description, "created_at": self.created_at, "valid_until": self.valid_until, "check_interval_seconds": self.check_interval_seconds, "bankroll": self.bankroll, "max_exposure": self.max_total_exposure, "actions": [a.to_dict() for a in self.actions], } def to_json(self, indent: int = 2) -> str: return json.dumps(self.to_dict(), indent=indent, default=str) def to_human_readable(self) -> str: lines = [ f"EXECUTION PLAN: {self.name}", f"{'=' * 60}", f"ID: {self.plan_id}", f"Bankroll: ${self.bankroll:,.0f} | Max exposure: {self.max_total_exposure:.0%}", f"Check every: {self.check_interval_seconds:.0f}s", "", ] for i, action in enumerate(self.actions, 1): lines.append(f"--- Action {i} ---") lines.append(action.to_human_readable()) lines.append("") return "\n".join(lines) # ══════════════════════════════════════════════════════════════════════════════ # 5. PLAN BUILDER — fluent API for constructing plans # ══════════════════════════════════════════════════════════════════════════════ class ExecutionPlanBuilder: """Fluent builder for constructing execution plans. Usage: plan = (ExecutionPlanBuilder("Election Trade") .bankroll(10000) .when_price_above("election_2026", 0.70) .then_sell_yes(size=0.05, reason="Take profit at 70%") .when_price_below("election_2026", 0.45) .then_buy_yes(size=0.03, reason="Dip buy below 45%") .when_kk_phase_transition("election_2026") .then_cash_out(reason="Phase transition detected") .expires_in_hours(24) .build()) """ def __init__(self, name: str): self._plan = ExecutionPlan(name=name) self._current_conditions: List[TriggerCondition] = [] self._current_market: str = "" self._current_logic: CombineLogic = CombineLogic.AND def bankroll(self, amount: float) -> "ExecutionPlanBuilder": self._plan.bankroll = amount return self def max_exposure(self, fraction: float) -> "ExecutionPlanBuilder": self._plan.max_total_exposure = fraction return self def check_every(self, seconds: float) -> "ExecutionPlanBuilder": self._plan.check_interval_seconds = seconds return self def expires_in_hours(self, hours: float) -> "ExecutionPlanBuilder": self._plan.valid_until = time.time() + hours * 3600 return self def expires_at(self, timestamp: float) -> "ExecutionPlanBuilder": self._plan.valid_until = timestamp return self # ── Trigger conditions ── def when_price_above(self, market_id: str, price: float) -> "ExecutionPlanBuilder": self._current_market = market_id self._current_conditions = [TriggerCondition(TriggerType.PRICE_ABOVE, price, market_id)] return self def when_price_below(self, market_id: str, price: float) -> "ExecutionPlanBuilder": self._current_market = market_id self._current_conditions = [TriggerCondition(TriggerType.PRICE_BELOW, price, market_id)] return self def when_arb_detected(self, market_id: str, min_edge: float = 0.02) -> "ExecutionPlanBuilder": self._current_market = market_id self._current_conditions = [TriggerCondition(TriggerType.ARB_DETECTED, min_edge, market_id)] return self def when_kk_phase_transition(self, market_id: str) -> "ExecutionPlanBuilder": self._current_market = market_id self._current_conditions = [TriggerCondition(TriggerType.KK_PHASE_TRANSITION, PHI, market_id)] return self def when_momentum_exhausted(self, market_id: str) -> "ExecutionPlanBuilder": self._current_market = market_id self._current_conditions = [TriggerCondition(TriggerType.MOMENTUM_EXHAUSTED, PHI, market_id)] return self def when_whale_move(self, market_id: str, min_size: float = 50_000) -> "ExecutionPlanBuilder": self._current_market = market_id self._current_conditions = [TriggerCondition(TriggerType.WHALE_MOVE, min_size, market_id)] return self def when_volume_spike(self, market_id: str, ratio: float = 3.0) -> "ExecutionPlanBuilder": self._current_market = market_id self._current_conditions = [TriggerCondition(TriggerType.VOLUME_SPIKE, ratio, market_id)] return self def when_sentiment_shift(self, market_id: str, delta: float = 0.3) -> "ExecutionPlanBuilder": self._current_market = market_id self._current_conditions = [TriggerCondition(TriggerType.SENTIMENT_SHIFT, delta, market_id)] return self def and_also(self, trigger_type: TriggerType, threshold: float = 0.0) -> "ExecutionPlanBuilder": self._current_conditions.append( TriggerCondition(trigger_type, threshold, self._current_market)) self._current_logic = CombineLogic.AND return self def or_else(self, trigger_type: TriggerType, threshold: float = 0.0) -> "ExecutionPlanBuilder": self._current_conditions.append( TriggerCondition(trigger_type, threshold, self._current_market)) self._current_logic = CombineLogic.OR return self # ── Actions ── def _add_action(self, action_type: ActionType, size: float = 0.01, max_dollars: float = 1000.0, limit_price: Optional[float] = None, reason: str = "") -> "ExecutionPlanBuilder": action = ConditionalAction( action_type=action_type, conditions=ConditionGroup(list(self._current_conditions), self._current_logic), market_id=self._current_market, size_fraction=size, max_size_dollars=max_dollars, limit_price=limit_price, causal_reason=reason, priority=len(self._plan.actions), ) self._plan.add_action(action) self._current_conditions = [] return self def then_buy_yes(self, size: float = 0.01, reason: str = "") -> "ExecutionPlanBuilder": return self._add_action(ActionType.BUY_YES, size, reason=reason) def then_buy_no(self, size: float = 0.01, reason: str = "") -> "ExecutionPlanBuilder": return self._add_action(ActionType.BUY_NO, size, reason=reason) def then_sell_yes(self, size: float = 0.01, reason: str = "") -> "ExecutionPlanBuilder": return self._add_action(ActionType.SELL_YES, size, reason=reason) def then_sell_no(self, size: float = 0.01, reason: str = "") -> "ExecutionPlanBuilder": return self._add_action(ActionType.SELL_NO, size, reason=reason) def then_cash_out(self, reason: str = "") -> "ExecutionPlanBuilder": return self._add_action(ActionType.CASH_OUT, size=1.0, reason=reason) def then_vote_yes(self, reason: str = "") -> "ExecutionPlanBuilder": return self._add_action(ActionType.VOTE_YES, reason=reason) def then_vote_no(self, reason: str = "") -> "ExecutionPlanBuilder": return self._add_action(ActionType.VOTE_NO, reason=reason) def then_hedge(self, size: float = 0.01, reason: str = "") -> "ExecutionPlanBuilder": return self._add_action(ActionType.HEDGE, size, reason=reason) def then_stop_loss(self, size: float = 1.0, reason: str = "") -> "ExecutionPlanBuilder": return self._add_action(ActionType.STOP_LOSS, size, reason=reason) def then_take_profit(self, size: float = 0.5, reason: str = "") -> "ExecutionPlanBuilder": return self._add_action(ActionType.TAKE_PROFIT, size, reason=reason) def build(self) -> ExecutionPlan: return self._plan # ══════════════════════════════════════════════════════════════════════════════ # 6. PLAN EXECUTION TRACKER — state machine for plan lifecycle # ══════════════════════════════════════════════════════════════════════════════ @dataclass class PlanExecutionLog: """Log entry for a plan action that fired.""" action_id: str action_type: str market_id: str timestamp: float market_data: Dict[str, Any] size_dollars: float reason: str class PlanExecutionTracker: """Tracks plan lifecycle: pending → monitoring → triggered → executed → expired.""" def __init__(self): self._plans: Dict[str, ExecutionPlan] = {} self._execution_log: List[PlanExecutionLog] = [] def register_plan(self, plan: ExecutionPlan) -> None: self._plans[plan.plan_id] = plan def tick(self, market_data: Dict[str, Any]) -> List[PlanExecutionLog]: """Evaluate all registered plans against current market data.""" logs = [] for plan in list(self._plans.values()): triggered = plan.evaluate(market_data) for action in triggered: size = action.size_fraction * plan.bankroll size = min(size, action.max_size_dollars) log = PlanExecutionLog( action_id=action.action_id, action_type=action.action_type.value, market_id=action.market_id, timestamp=time.time(), market_data=market_data, size_dollars=size, reason=action.causal_reason, ) logs.append(log) self._execution_log.append(log) return logs def remove_plan(self, plan_id: str) -> None: self._plans.pop(plan_id, None) @property def active_plans(self) -> int: return len(self._plans) @property def execution_history(self) -> List[PlanExecutionLog]: return self._execution_log # ══════════════════════════════════════════════════════════════════════════════ # 7. EXAMPLE PLAN GENERATOR — creates plans from market analysis # ══════════════════════════════════════════════════════════════════════════════ def generate_arb_plan(market_id: str, yes_venue: str, no_venue: str, yes_price: float, no_price: float, bankroll: float = 10_000) -> ExecutionPlan: """Generate a cross-venue arbitrage execution plan.""" edge = 1.0 - yes_price - no_price return (ExecutionPlanBuilder(f"Arb: {market_id}") .bankroll(bankroll) .when_price_below(market_id, yes_price + 0.02) .then_buy_yes(size=0.03, reason=f"Cross-venue arb edge {edge:.1%}") .when_price_below(market_id, no_price + 0.02) .then_buy_no(size=0.03, reason=f"Hedge NO side, lock in {edge:.1%}") .when_price_above(market_id, yes_price + 0.10) .then_sell_yes(size=0.03, reason="Take profit on YES leg") .when_kk_phase_transition(market_id) .then_cash_out(reason="Phase transition — exit all") .expires_in_hours(4) .build()) def generate_momentum_plan(market_id: str, current_price: float, trend_direction: str = "up", bankroll: float = 10_000) -> ExecutionPlan: """Generate a momentum-following execution plan.""" builder = ExecutionPlanBuilder(f"Momentum: {market_id}").bankroll(bankroll) if trend_direction == "up": builder.when_price_above(market_id, current_price + 0.05) builder.then_buy_yes(size=0.02, reason="Momentum continuation") builder.when_price_above(market_id, current_price + 0.15) builder.then_take_profit(size=0.5, reason="Take profit at +15%") builder.when_price_below(market_id, current_price - 0.05) builder.then_stop_loss(reason="Stop loss at -5%") else: builder.when_price_below(market_id, current_price - 0.05) builder.then_buy_no(size=0.02, reason="Downward momentum") builder.when_price_below(market_id, current_price - 0.15) builder.then_take_profit(size=0.5, reason="Take profit at -15%") builder.when_price_above(market_id, current_price + 0.05) builder.then_stop_loss(reason="Stop loss at +5%") builder.when_momentum_exhausted(market_id) builder.then_cash_out(reason="KK phase > φ — momentum exhausted") builder.expires_in_hours(8) return builder.build() def generate_event_plan(market_id: str, event_time: float, bankroll: float = 10_000) -> ExecutionPlan: """Generate a pre-event / post-event execution plan.""" pre_event = event_time - 3600 # 1 hour before return (ExecutionPlanBuilder(f"Event: {market_id}") .bankroll(bankroll) # Pre-event: build position .when_sentiment_shift(market_id, delta=0.3) .then_buy_yes(size=0.02, reason="Strong positive sentiment pre-event") .when_volume_spike(market_id, ratio=5.0) .then_buy_yes(size=0.01, reason="Volume surge indicates insider knowledge") # During event: scale .when_price_above(market_id, 0.75) .then_take_profit(size=0.5, reason="High confidence — lock in gains") .when_price_below(market_id, 0.25) .then_stop_loss(reason="Event went against us") # Post-event: exit .when_kk_phase_transition(market_id) .then_cash_out(reason="Post-event phase transition") .expires_in_hours(12) .build())