Spaces:
Build error
Build error
| """Strategy Code Generator β produces agent-readable execution plans. | |
| Generates structured conditional execution plans that another agent | |
| (or automated execution system) can parse and execute: | |
| "If market X reaches price Y within Z minutes, then BUY/SELL/CASH OUT" | |
| Plans are expressed as a DAG of ConditionalAction nodes with: | |
| - Trigger conditions (price, time, volume, sentiment, KK phase) | |
| - Actions (buy, sell, hold, hedge, cash_out, vote_yes, vote_no) | |
| - Deadlines and expiration | |
| - Escalation rules (Q-learner sized) | |
| - Causal reasoning (why this action) | |
| Output formats: | |
| - Structured Python dict (for agent consumption) | |
| - JSON (for API/webhook delivery) | |
| - Human-readable plan text | |
| Design patterns: | |
| Builder: ExecutionPlanBuilder for fluent plan construction | |
| Strategy: TriggerCondition ABC for different market triggers | |
| Chain: ConditionChain links multiple conditions with AND/OR | |
| Observer: PlanExecutionTracker monitors plan state machine | |
| Neurosymbolic: plans are SYMBOLIC rule structures; triggers are NEURAL KAN signals. | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import time | |
| import uuid | |
| from abc import ABC, abstractmethod | |
| from dataclasses import dataclass, field | |
| from enum import Enum, auto | |
| from typing import Any, Callable, Dict, List, Optional, Tuple | |
| from prediction_engine import PHI, PHI_INV | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # 1. ENUMS β action types and trigger conditions | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class ActionType(Enum): | |
| BUY_YES = "buy_yes" | |
| BUY_NO = "buy_no" | |
| SELL_YES = "sell_yes" | |
| SELL_NO = "sell_no" | |
| HOLD = "hold" | |
| HEDGE = "hedge" | |
| CASH_OUT = "cash_out" | |
| VOTE_YES = "vote_yes" | |
| VOTE_NO = "vote_no" | |
| SCALE_IN = "scale_in" | |
| SCALE_OUT = "scale_out" | |
| STOP_LOSS = "stop_loss" | |
| TAKE_PROFIT = "take_profit" | |
| class TriggerType(Enum): | |
| PRICE_ABOVE = "price_above" | |
| PRICE_BELOW = "price_below" | |
| TIME_AFTER = "time_after" | |
| TIME_BEFORE = "time_before" | |
| VOLUME_SPIKE = "volume_spike" | |
| SENTIMENT_SHIFT = "sentiment_shift" | |
| KK_PHASE_TRANSITION = "kk_phase_transition" | |
| MOMENTUM_EXHAUSTED = "momentum_exhausted" | |
| ARB_DETECTED = "arb_detected" | |
| WHALE_MOVE = "whale_move" | |
| SPREAD_NARROW = "spread_narrow" | |
| SPREAD_WIDEN = "spread_widen" | |
| ALWAYS = "always" | |
| class CombineLogic(Enum): | |
| AND = "and" | |
| OR = "or" | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # 2. TRIGGER CONDITIONS β when to fire | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class TriggerCondition: | |
| """A single condition that must be met for an action to fire.""" | |
| trigger_type: TriggerType | |
| threshold: float = 0.0 | |
| market_id: str = "" | |
| venue: str = "" | |
| description: str = "" | |
| def to_dict(self) -> Dict[str, Any]: | |
| return { | |
| "type": self.trigger_type.value, | |
| "threshold": self.threshold, | |
| "market_id": self.market_id, | |
| "venue": self.venue, | |
| "description": self.description or self._default_description(), | |
| } | |
| def _default_description(self) -> str: | |
| mapping = { | |
| TriggerType.PRICE_ABOVE: f"price rises above {self.threshold:.2f}", | |
| TriggerType.PRICE_BELOW: f"price drops below {self.threshold:.2f}", | |
| TriggerType.TIME_AFTER: f"after timestamp {self.threshold}", | |
| TriggerType.TIME_BEFORE: f"before timestamp {self.threshold}", | |
| TriggerType.VOLUME_SPIKE: f"volume exceeds {self.threshold:.0f}x normal", | |
| TriggerType.SENTIMENT_SHIFT: f"sentiment shifts by {self.threshold:.2f}", | |
| TriggerType.KK_PHASE_TRANSITION: "KK phase crosses Ο", | |
| TriggerType.MOMENTUM_EXHAUSTED: "momentum signal exhausted", | |
| TriggerType.ARB_DETECTED: f"arbitrage > {self.threshold:.1%}", | |
| TriggerType.WHALE_MOVE: f"whale trade > ${self.threshold:,.0f}", | |
| TriggerType.SPREAD_NARROW: f"spread narrows below {self.threshold:.3f}", | |
| TriggerType.SPREAD_WIDEN: f"spread widens above {self.threshold:.3f}", | |
| TriggerType.ALWAYS: "unconditional", | |
| } | |
| return mapping.get(self.trigger_type, str(self.trigger_type)) | |
| def evaluate(self, market_data: Dict[str, Any]) -> bool: | |
| """Evaluate condition against live market data.""" | |
| price = market_data.get("price", 0.5) | |
| volume = market_data.get("volume_ratio", 1.0) | |
| sentiment = market_data.get("sentiment_delta", 0.0) | |
| kk_phase = market_data.get("kk_phase", 0.0) | |
| spread = market_data.get("spread", 0.0) | |
| now = market_data.get("timestamp", time.time()) | |
| checks = { | |
| TriggerType.PRICE_ABOVE: price > self.threshold, | |
| TriggerType.PRICE_BELOW: price < self.threshold, | |
| TriggerType.TIME_AFTER: now > self.threshold, | |
| TriggerType.TIME_BEFORE: now < self.threshold, | |
| TriggerType.VOLUME_SPIKE: volume > self.threshold, | |
| TriggerType.SENTIMENT_SHIFT: abs(sentiment) > self.threshold, | |
| TriggerType.KK_PHASE_TRANSITION: abs(kk_phase - PHI) < 0.1, | |
| TriggerType.MOMENTUM_EXHAUSTED: kk_phase > PHI, | |
| TriggerType.ARB_DETECTED: market_data.get("arb_edge", 0) > self.threshold, | |
| TriggerType.WHALE_MOVE: market_data.get("whale_size", 0) > self.threshold, | |
| TriggerType.SPREAD_NARROW: spread < self.threshold, | |
| TriggerType.SPREAD_WIDEN: spread > self.threshold, | |
| TriggerType.ALWAYS: True, | |
| } | |
| return checks.get(self.trigger_type, False) | |
| class ConditionGroup: | |
| """Multiple conditions combined with AND/OR logic.""" | |
| conditions: List[TriggerCondition] = field(default_factory=list) | |
| logic: CombineLogic = CombineLogic.AND | |
| def evaluate(self, market_data: Dict[str, Any]) -> bool: | |
| if not self.conditions: | |
| return True | |
| results = [c.evaluate(market_data) for c in self.conditions] | |
| if self.logic == CombineLogic.AND: | |
| return all(results) | |
| return any(results) | |
| def to_dict(self) -> Dict[str, Any]: | |
| return { | |
| "logic": self.logic.value, | |
| "conditions": [c.to_dict() for c in self.conditions], | |
| } | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # 3. CONDITIONAL ACTIONS β what to do when triggered | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class ConditionalAction: | |
| """A single action in an execution plan. | |
| If conditions met β execute action with specified sizing. | |
| """ | |
| action_id: str = field(default_factory=lambda: str(uuid.uuid4())[:8]) | |
| action_type: ActionType = ActionType.HOLD | |
| conditions: ConditionGroup = field(default_factory=ConditionGroup) | |
| market_id: str = "" | |
| venue: str = "" | |
| size_fraction: float = 0.01 # fraction of bankroll | |
| max_size_dollars: float = 1000.0 | |
| limit_price: Optional[float] = None | |
| expires_at: Optional[float] = None # timestamp | |
| priority: int = 0 # higher = execute first | |
| causal_reason: str = "" # WHY this action (from causal engine) | |
| depends_on: Optional[str] = None # action_id that must fire first | |
| cancel_if: Optional[str] = None # action_id that if fired cancels this | |
| def to_dict(self) -> Dict[str, Any]: | |
| result = { | |
| "id": self.action_id, | |
| "action": self.action_type.value, | |
| "market": self.market_id, | |
| "venue": self.venue, | |
| "conditions": self.conditions.to_dict(), | |
| "sizing": { | |
| "fraction": self.size_fraction, | |
| "max_dollars": self.max_size_dollars, | |
| "limit_price": self.limit_price, | |
| }, | |
| "priority": self.priority, | |
| "reason": self.causal_reason, | |
| } | |
| if self.expires_at: | |
| result["expires_at"] = self.expires_at | |
| if self.depends_on: | |
| result["depends_on"] = self.depends_on | |
| if self.cancel_if: | |
| result["cancel_if"] = self.cancel_if | |
| return result | |
| def to_human_readable(self) -> str: | |
| conds = " AND ".join(c._default_description() for c in self.conditions.conditions) | |
| if self.conditions.logic == CombineLogic.OR: | |
| conds = " OR ".join(c._default_description() for c in self.conditions.conditions) | |
| if not conds: | |
| conds = "unconditionally" | |
| size_desc = f"{self.size_fraction:.0%} of bankroll (max ${self.max_size_dollars:,.0f})" | |
| price_desc = f" at limit ${self.limit_price:.2f}" if self.limit_price else "" | |
| return ( | |
| f"IF {conds} on {self.market_id}:\n" | |
| f" β {self.action_type.value.upper()} {size_desc}{price_desc}\n" | |
| f" Reason: {self.causal_reason}" | |
| ) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # 4. EXECUTION PLAN β full plan with multiple actions | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class ExecutionPlan: | |
| """A complete execution plan with conditional actions. | |
| Plans are periodic β they evaluate on every market tick and | |
| fire actions when conditions are met. | |
| """ | |
| plan_id: str = field(default_factory=lambda: str(uuid.uuid4())[:12]) | |
| name: str = "" | |
| description: str = "" | |
| actions: List[ConditionalAction] = field(default_factory=list) | |
| created_at: float = field(default_factory=time.time) | |
| valid_until: Optional[float] = None | |
| check_interval_seconds: float = 60.0 # how often to evaluate | |
| bankroll: float = 10_000.0 | |
| max_total_exposure: float = 0.25 # max 25% of bankroll at risk | |
| def add_action(self, action: ConditionalAction) -> "ExecutionPlan": | |
| self.actions.append(action) | |
| return self | |
| def evaluate(self, market_data: Dict[str, Any]) -> List[ConditionalAction]: | |
| """Evaluate all actions against current market data. | |
| Returns list of actions whose conditions are met. | |
| """ | |
| now = market_data.get("timestamp", time.time()) | |
| if self.valid_until and now > self.valid_until: | |
| return [] # plan expired | |
| fired_ids = set() | |
| pending = sorted(self.actions, key=lambda a: -a.priority) | |
| triggered = [] | |
| for action in pending: | |
| # Check expiration | |
| if action.expires_at and now > action.expires_at: | |
| continue | |
| # Check dependency | |
| if action.depends_on and action.depends_on not in fired_ids: | |
| continue | |
| # Check cancellation | |
| if action.cancel_if and action.cancel_if in fired_ids: | |
| continue | |
| # Evaluate conditions | |
| if action.conditions.evaluate(market_data): | |
| triggered.append(action) | |
| fired_ids.add(action.action_id) | |
| return triggered | |
| def to_dict(self) -> Dict[str, Any]: | |
| return { | |
| "plan_id": self.plan_id, | |
| "name": self.name, | |
| "description": self.description, | |
| "created_at": self.created_at, | |
| "valid_until": self.valid_until, | |
| "check_interval_seconds": self.check_interval_seconds, | |
| "bankroll": self.bankroll, | |
| "max_exposure": self.max_total_exposure, | |
| "actions": [a.to_dict() for a in self.actions], | |
| } | |
| def to_json(self, indent: int = 2) -> str: | |
| return json.dumps(self.to_dict(), indent=indent, default=str) | |
| def to_human_readable(self) -> str: | |
| lines = [ | |
| f"EXECUTION PLAN: {self.name}", | |
| f"{'=' * 60}", | |
| f"ID: {self.plan_id}", | |
| f"Bankroll: ${self.bankroll:,.0f} | Max exposure: {self.max_total_exposure:.0%}", | |
| f"Check every: {self.check_interval_seconds:.0f}s", | |
| "", | |
| ] | |
| for i, action in enumerate(self.actions, 1): | |
| lines.append(f"--- Action {i} ---") | |
| lines.append(action.to_human_readable()) | |
| lines.append("") | |
| return "\n".join(lines) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # 5. PLAN BUILDER β fluent API for constructing plans | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class ExecutionPlanBuilder: | |
| """Fluent builder for constructing execution plans. | |
| Usage: | |
| plan = (ExecutionPlanBuilder("Election Trade") | |
| .bankroll(10000) | |
| .when_price_above("election_2026", 0.70) | |
| .then_sell_yes(size=0.05, reason="Take profit at 70%") | |
| .when_price_below("election_2026", 0.45) | |
| .then_buy_yes(size=0.03, reason="Dip buy below 45%") | |
| .when_kk_phase_transition("election_2026") | |
| .then_cash_out(reason="Phase transition detected") | |
| .expires_in_hours(24) | |
| .build()) | |
| """ | |
| def __init__(self, name: str): | |
| self._plan = ExecutionPlan(name=name) | |
| self._current_conditions: List[TriggerCondition] = [] | |
| self._current_market: str = "" | |
| self._current_logic: CombineLogic = CombineLogic.AND | |
| def bankroll(self, amount: float) -> "ExecutionPlanBuilder": | |
| self._plan.bankroll = amount | |
| return self | |
| def max_exposure(self, fraction: float) -> "ExecutionPlanBuilder": | |
| self._plan.max_total_exposure = fraction | |
| return self | |
| def check_every(self, seconds: float) -> "ExecutionPlanBuilder": | |
| self._plan.check_interval_seconds = seconds | |
| return self | |
| def expires_in_hours(self, hours: float) -> "ExecutionPlanBuilder": | |
| self._plan.valid_until = time.time() + hours * 3600 | |
| return self | |
| def expires_at(self, timestamp: float) -> "ExecutionPlanBuilder": | |
| self._plan.valid_until = timestamp | |
| return self | |
| # ββ Trigger conditions ββ | |
| def when_price_above(self, market_id: str, price: float) -> "ExecutionPlanBuilder": | |
| self._current_market = market_id | |
| self._current_conditions = [TriggerCondition(TriggerType.PRICE_ABOVE, price, market_id)] | |
| return self | |
| def when_price_below(self, market_id: str, price: float) -> "ExecutionPlanBuilder": | |
| self._current_market = market_id | |
| self._current_conditions = [TriggerCondition(TriggerType.PRICE_BELOW, price, market_id)] | |
| return self | |
| def when_arb_detected(self, market_id: str, min_edge: float = 0.02) -> "ExecutionPlanBuilder": | |
| self._current_market = market_id | |
| self._current_conditions = [TriggerCondition(TriggerType.ARB_DETECTED, min_edge, market_id)] | |
| return self | |
| def when_kk_phase_transition(self, market_id: str) -> "ExecutionPlanBuilder": | |
| self._current_market = market_id | |
| self._current_conditions = [TriggerCondition(TriggerType.KK_PHASE_TRANSITION, PHI, market_id)] | |
| return self | |
| def when_momentum_exhausted(self, market_id: str) -> "ExecutionPlanBuilder": | |
| self._current_market = market_id | |
| self._current_conditions = [TriggerCondition(TriggerType.MOMENTUM_EXHAUSTED, PHI, market_id)] | |
| return self | |
| def when_whale_move(self, market_id: str, min_size: float = 50_000) -> "ExecutionPlanBuilder": | |
| self._current_market = market_id | |
| self._current_conditions = [TriggerCondition(TriggerType.WHALE_MOVE, min_size, market_id)] | |
| return self | |
| def when_volume_spike(self, market_id: str, ratio: float = 3.0) -> "ExecutionPlanBuilder": | |
| self._current_market = market_id | |
| self._current_conditions = [TriggerCondition(TriggerType.VOLUME_SPIKE, ratio, market_id)] | |
| return self | |
| def when_sentiment_shift(self, market_id: str, delta: float = 0.3) -> "ExecutionPlanBuilder": | |
| self._current_market = market_id | |
| self._current_conditions = [TriggerCondition(TriggerType.SENTIMENT_SHIFT, delta, market_id)] | |
| return self | |
| def and_also(self, trigger_type: TriggerType, threshold: float = 0.0) -> "ExecutionPlanBuilder": | |
| self._current_conditions.append( | |
| TriggerCondition(trigger_type, threshold, self._current_market)) | |
| self._current_logic = CombineLogic.AND | |
| return self | |
| def or_else(self, trigger_type: TriggerType, threshold: float = 0.0) -> "ExecutionPlanBuilder": | |
| self._current_conditions.append( | |
| TriggerCondition(trigger_type, threshold, self._current_market)) | |
| self._current_logic = CombineLogic.OR | |
| return self | |
| # ββ Actions ββ | |
| def _add_action(self, action_type: ActionType, size: float = 0.01, | |
| max_dollars: float = 1000.0, limit_price: Optional[float] = None, | |
| reason: str = "") -> "ExecutionPlanBuilder": | |
| action = ConditionalAction( | |
| action_type=action_type, | |
| conditions=ConditionGroup(list(self._current_conditions), self._current_logic), | |
| market_id=self._current_market, | |
| size_fraction=size, | |
| max_size_dollars=max_dollars, | |
| limit_price=limit_price, | |
| causal_reason=reason, | |
| priority=len(self._plan.actions), | |
| ) | |
| self._plan.add_action(action) | |
| self._current_conditions = [] | |
| return self | |
| def then_buy_yes(self, size: float = 0.01, reason: str = "") -> "ExecutionPlanBuilder": | |
| return self._add_action(ActionType.BUY_YES, size, reason=reason) | |
| def then_buy_no(self, size: float = 0.01, reason: str = "") -> "ExecutionPlanBuilder": | |
| return self._add_action(ActionType.BUY_NO, size, reason=reason) | |
| def then_sell_yes(self, size: float = 0.01, reason: str = "") -> "ExecutionPlanBuilder": | |
| return self._add_action(ActionType.SELL_YES, size, reason=reason) | |
| def then_sell_no(self, size: float = 0.01, reason: str = "") -> "ExecutionPlanBuilder": | |
| return self._add_action(ActionType.SELL_NO, size, reason=reason) | |
| def then_cash_out(self, reason: str = "") -> "ExecutionPlanBuilder": | |
| return self._add_action(ActionType.CASH_OUT, size=1.0, reason=reason) | |
| def then_vote_yes(self, reason: str = "") -> "ExecutionPlanBuilder": | |
| return self._add_action(ActionType.VOTE_YES, reason=reason) | |
| def then_vote_no(self, reason: str = "") -> "ExecutionPlanBuilder": | |
| return self._add_action(ActionType.VOTE_NO, reason=reason) | |
| def then_hedge(self, size: float = 0.01, reason: str = "") -> "ExecutionPlanBuilder": | |
| return self._add_action(ActionType.HEDGE, size, reason=reason) | |
| def then_stop_loss(self, size: float = 1.0, reason: str = "") -> "ExecutionPlanBuilder": | |
| return self._add_action(ActionType.STOP_LOSS, size, reason=reason) | |
| def then_take_profit(self, size: float = 0.5, reason: str = "") -> "ExecutionPlanBuilder": | |
| return self._add_action(ActionType.TAKE_PROFIT, size, reason=reason) | |
| def build(self) -> ExecutionPlan: | |
| return self._plan | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # 6. PLAN EXECUTION TRACKER β state machine for plan lifecycle | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class PlanExecutionLog: | |
| """Log entry for a plan action that fired.""" | |
| action_id: str | |
| action_type: str | |
| market_id: str | |
| timestamp: float | |
| market_data: Dict[str, Any] | |
| size_dollars: float | |
| reason: str | |
| class PlanExecutionTracker: | |
| """Tracks plan lifecycle: pending β monitoring β triggered β executed β expired.""" | |
| def __init__(self): | |
| self._plans: Dict[str, ExecutionPlan] = {} | |
| self._execution_log: List[PlanExecutionLog] = [] | |
| def register_plan(self, plan: ExecutionPlan) -> None: | |
| self._plans[plan.plan_id] = plan | |
| def tick(self, market_data: Dict[str, Any]) -> List[PlanExecutionLog]: | |
| """Evaluate all registered plans against current market data.""" | |
| logs = [] | |
| for plan in list(self._plans.values()): | |
| triggered = plan.evaluate(market_data) | |
| for action in triggered: | |
| size = action.size_fraction * plan.bankroll | |
| size = min(size, action.max_size_dollars) | |
| log = PlanExecutionLog( | |
| action_id=action.action_id, | |
| action_type=action.action_type.value, | |
| market_id=action.market_id, | |
| timestamp=time.time(), | |
| market_data=market_data, | |
| size_dollars=size, | |
| reason=action.causal_reason, | |
| ) | |
| logs.append(log) | |
| self._execution_log.append(log) | |
| return logs | |
| def remove_plan(self, plan_id: str) -> None: | |
| self._plans.pop(plan_id, None) | |
| def active_plans(self) -> int: | |
| return len(self._plans) | |
| def execution_history(self) -> List[PlanExecutionLog]: | |
| return self._execution_log | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # 7. EXAMPLE PLAN GENERATOR β creates plans from market analysis | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def generate_arb_plan(market_id: str, yes_venue: str, no_venue: str, | |
| yes_price: float, no_price: float, | |
| bankroll: float = 10_000) -> ExecutionPlan: | |
| """Generate a cross-venue arbitrage execution plan.""" | |
| edge = 1.0 - yes_price - no_price | |
| return (ExecutionPlanBuilder(f"Arb: {market_id}") | |
| .bankroll(bankroll) | |
| .when_price_below(market_id, yes_price + 0.02) | |
| .then_buy_yes(size=0.03, reason=f"Cross-venue arb edge {edge:.1%}") | |
| .when_price_below(market_id, no_price + 0.02) | |
| .then_buy_no(size=0.03, reason=f"Hedge NO side, lock in {edge:.1%}") | |
| .when_price_above(market_id, yes_price + 0.10) | |
| .then_sell_yes(size=0.03, reason="Take profit on YES leg") | |
| .when_kk_phase_transition(market_id) | |
| .then_cash_out(reason="Phase transition β exit all") | |
| .expires_in_hours(4) | |
| .build()) | |
| def generate_momentum_plan(market_id: str, current_price: float, | |
| trend_direction: str = "up", | |
| bankroll: float = 10_000) -> ExecutionPlan: | |
| """Generate a momentum-following execution plan.""" | |
| builder = ExecutionPlanBuilder(f"Momentum: {market_id}").bankroll(bankroll) | |
| if trend_direction == "up": | |
| builder.when_price_above(market_id, current_price + 0.05) | |
| builder.then_buy_yes(size=0.02, reason="Momentum continuation") | |
| builder.when_price_above(market_id, current_price + 0.15) | |
| builder.then_take_profit(size=0.5, reason="Take profit at +15%") | |
| builder.when_price_below(market_id, current_price - 0.05) | |
| builder.then_stop_loss(reason="Stop loss at -5%") | |
| else: | |
| builder.when_price_below(market_id, current_price - 0.05) | |
| builder.then_buy_no(size=0.02, reason="Downward momentum") | |
| builder.when_price_below(market_id, current_price - 0.15) | |
| builder.then_take_profit(size=0.5, reason="Take profit at -15%") | |
| builder.when_price_above(market_id, current_price + 0.05) | |
| builder.then_stop_loss(reason="Stop loss at +5%") | |
| builder.when_momentum_exhausted(market_id) | |
| builder.then_cash_out(reason="KK phase > Ο β momentum exhausted") | |
| builder.expires_in_hours(8) | |
| return builder.build() | |
| def generate_event_plan(market_id: str, event_time: float, | |
| bankroll: float = 10_000) -> ExecutionPlan: | |
| """Generate a pre-event / post-event execution plan.""" | |
| pre_event = event_time - 3600 # 1 hour before | |
| return (ExecutionPlanBuilder(f"Event: {market_id}") | |
| .bankroll(bankroll) | |
| # Pre-event: build position | |
| .when_sentiment_shift(market_id, delta=0.3) | |
| .then_buy_yes(size=0.02, reason="Strong positive sentiment pre-event") | |
| .when_volume_spike(market_id, ratio=5.0) | |
| .then_buy_yes(size=0.01, reason="Volume surge indicates insider knowledge") | |
| # During event: scale | |
| .when_price_above(market_id, 0.75) | |
| .then_take_profit(size=0.5, reason="High confidence β lock in gains") | |
| .when_price_below(market_id, 0.25) | |
| .then_stop_loss(reason="Event went against us") | |
| # Post-event: exit | |
| .when_kk_phase_transition(market_id) | |
| .then_cash_out(reason="Post-event phase transition") | |
| .expires_in_hours(12) | |
| .build()) | |