JohnGenetica's picture
Deploy ANE KAN runtime Space
201cf4d verified
"""Strategy Code Generator β€” produces agent-readable execution plans.
Generates structured conditional execution plans that another agent
(or automated execution system) can parse and execute:
"If market X reaches price Y within Z minutes, then BUY/SELL/CASH OUT"
Plans are expressed as a DAG of ConditionalAction nodes with:
- Trigger conditions (price, time, volume, sentiment, KK phase)
- Actions (buy, sell, hold, hedge, cash_out, vote_yes, vote_no)
- Deadlines and expiration
- Escalation rules (Q-learner sized)
- Causal reasoning (why this action)
Output formats:
- Structured Python dict (for agent consumption)
- JSON (for API/webhook delivery)
- Human-readable plan text
Design patterns:
Builder: ExecutionPlanBuilder for fluent plan construction
Strategy: TriggerCondition ABC for different market triggers
Chain: ConditionChain links multiple conditions with AND/OR
Observer: PlanExecutionTracker monitors plan state machine
Neurosymbolic: plans are SYMBOLIC rule structures; triggers are NEURAL KAN signals.
"""
from __future__ import annotations
import json
import time
import uuid
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from enum import Enum, auto
from typing import Any, Callable, Dict, List, Optional, Tuple
from prediction_engine import PHI, PHI_INV
# ══════════════════════════════════════════════════════════════════════════════
# 1. ENUMS β€” action types and trigger conditions
# ══════════════════════════════════════════════════════════════════════════════
class ActionType(Enum):
BUY_YES = "buy_yes"
BUY_NO = "buy_no"
SELL_YES = "sell_yes"
SELL_NO = "sell_no"
HOLD = "hold"
HEDGE = "hedge"
CASH_OUT = "cash_out"
VOTE_YES = "vote_yes"
VOTE_NO = "vote_no"
SCALE_IN = "scale_in"
SCALE_OUT = "scale_out"
STOP_LOSS = "stop_loss"
TAKE_PROFIT = "take_profit"
class TriggerType(Enum):
PRICE_ABOVE = "price_above"
PRICE_BELOW = "price_below"
TIME_AFTER = "time_after"
TIME_BEFORE = "time_before"
VOLUME_SPIKE = "volume_spike"
SENTIMENT_SHIFT = "sentiment_shift"
KK_PHASE_TRANSITION = "kk_phase_transition"
MOMENTUM_EXHAUSTED = "momentum_exhausted"
ARB_DETECTED = "arb_detected"
WHALE_MOVE = "whale_move"
SPREAD_NARROW = "spread_narrow"
SPREAD_WIDEN = "spread_widen"
ALWAYS = "always"
class CombineLogic(Enum):
AND = "and"
OR = "or"
# ══════════════════════════════════════════════════════════════════════════════
# 2. TRIGGER CONDITIONS β€” when to fire
# ══════════════════════════════════════════════════════════════════════════════
@dataclass
class TriggerCondition:
"""A single condition that must be met for an action to fire."""
trigger_type: TriggerType
threshold: float = 0.0
market_id: str = ""
venue: str = ""
description: str = ""
def to_dict(self) -> Dict[str, Any]:
return {
"type": self.trigger_type.value,
"threshold": self.threshold,
"market_id": self.market_id,
"venue": self.venue,
"description": self.description or self._default_description(),
}
def _default_description(self) -> str:
mapping = {
TriggerType.PRICE_ABOVE: f"price rises above {self.threshold:.2f}",
TriggerType.PRICE_BELOW: f"price drops below {self.threshold:.2f}",
TriggerType.TIME_AFTER: f"after timestamp {self.threshold}",
TriggerType.TIME_BEFORE: f"before timestamp {self.threshold}",
TriggerType.VOLUME_SPIKE: f"volume exceeds {self.threshold:.0f}x normal",
TriggerType.SENTIMENT_SHIFT: f"sentiment shifts by {self.threshold:.2f}",
TriggerType.KK_PHASE_TRANSITION: "KK phase crosses Ο†",
TriggerType.MOMENTUM_EXHAUSTED: "momentum signal exhausted",
TriggerType.ARB_DETECTED: f"arbitrage > {self.threshold:.1%}",
TriggerType.WHALE_MOVE: f"whale trade > ${self.threshold:,.0f}",
TriggerType.SPREAD_NARROW: f"spread narrows below {self.threshold:.3f}",
TriggerType.SPREAD_WIDEN: f"spread widens above {self.threshold:.3f}",
TriggerType.ALWAYS: "unconditional",
}
return mapping.get(self.trigger_type, str(self.trigger_type))
def evaluate(self, market_data: Dict[str, Any]) -> bool:
"""Evaluate condition against live market data."""
price = market_data.get("price", 0.5)
volume = market_data.get("volume_ratio", 1.0)
sentiment = market_data.get("sentiment_delta", 0.0)
kk_phase = market_data.get("kk_phase", 0.0)
spread = market_data.get("spread", 0.0)
now = market_data.get("timestamp", time.time())
checks = {
TriggerType.PRICE_ABOVE: price > self.threshold,
TriggerType.PRICE_BELOW: price < self.threshold,
TriggerType.TIME_AFTER: now > self.threshold,
TriggerType.TIME_BEFORE: now < self.threshold,
TriggerType.VOLUME_SPIKE: volume > self.threshold,
TriggerType.SENTIMENT_SHIFT: abs(sentiment) > self.threshold,
TriggerType.KK_PHASE_TRANSITION: abs(kk_phase - PHI) < 0.1,
TriggerType.MOMENTUM_EXHAUSTED: kk_phase > PHI,
TriggerType.ARB_DETECTED: market_data.get("arb_edge", 0) > self.threshold,
TriggerType.WHALE_MOVE: market_data.get("whale_size", 0) > self.threshold,
TriggerType.SPREAD_NARROW: spread < self.threshold,
TriggerType.SPREAD_WIDEN: spread > self.threshold,
TriggerType.ALWAYS: True,
}
return checks.get(self.trigger_type, False)
@dataclass
class ConditionGroup:
"""Multiple conditions combined with AND/OR logic."""
conditions: List[TriggerCondition] = field(default_factory=list)
logic: CombineLogic = CombineLogic.AND
def evaluate(self, market_data: Dict[str, Any]) -> bool:
if not self.conditions:
return True
results = [c.evaluate(market_data) for c in self.conditions]
if self.logic == CombineLogic.AND:
return all(results)
return any(results)
def to_dict(self) -> Dict[str, Any]:
return {
"logic": self.logic.value,
"conditions": [c.to_dict() for c in self.conditions],
}
# ══════════════════════════════════════════════════════════════════════════════
# 3. CONDITIONAL ACTIONS β€” what to do when triggered
# ══════════════════════════════════════════════════════════════════════════════
@dataclass
class ConditionalAction:
"""A single action in an execution plan.
If conditions met β†’ execute action with specified sizing.
"""
action_id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
action_type: ActionType = ActionType.HOLD
conditions: ConditionGroup = field(default_factory=ConditionGroup)
market_id: str = ""
venue: str = ""
size_fraction: float = 0.01 # fraction of bankroll
max_size_dollars: float = 1000.0
limit_price: Optional[float] = None
expires_at: Optional[float] = None # timestamp
priority: int = 0 # higher = execute first
causal_reason: str = "" # WHY this action (from causal engine)
depends_on: Optional[str] = None # action_id that must fire first
cancel_if: Optional[str] = None # action_id that if fired cancels this
def to_dict(self) -> Dict[str, Any]:
result = {
"id": self.action_id,
"action": self.action_type.value,
"market": self.market_id,
"venue": self.venue,
"conditions": self.conditions.to_dict(),
"sizing": {
"fraction": self.size_fraction,
"max_dollars": self.max_size_dollars,
"limit_price": self.limit_price,
},
"priority": self.priority,
"reason": self.causal_reason,
}
if self.expires_at:
result["expires_at"] = self.expires_at
if self.depends_on:
result["depends_on"] = self.depends_on
if self.cancel_if:
result["cancel_if"] = self.cancel_if
return result
def to_human_readable(self) -> str:
conds = " AND ".join(c._default_description() for c in self.conditions.conditions)
if self.conditions.logic == CombineLogic.OR:
conds = " OR ".join(c._default_description() for c in self.conditions.conditions)
if not conds:
conds = "unconditionally"
size_desc = f"{self.size_fraction:.0%} of bankroll (max ${self.max_size_dollars:,.0f})"
price_desc = f" at limit ${self.limit_price:.2f}" if self.limit_price else ""
return (
f"IF {conds} on {self.market_id}:\n"
f" β†’ {self.action_type.value.upper()} {size_desc}{price_desc}\n"
f" Reason: {self.causal_reason}"
)
# ══════════════════════════════════════════════════════════════════════════════
# 4. EXECUTION PLAN β€” full plan with multiple actions
# ══════════════════════════════════════════════════════════════════════════════
@dataclass
class ExecutionPlan:
"""A complete execution plan with conditional actions.
Plans are periodic β€” they evaluate on every market tick and
fire actions when conditions are met.
"""
plan_id: str = field(default_factory=lambda: str(uuid.uuid4())[:12])
name: str = ""
description: str = ""
actions: List[ConditionalAction] = field(default_factory=list)
created_at: float = field(default_factory=time.time)
valid_until: Optional[float] = None
check_interval_seconds: float = 60.0 # how often to evaluate
bankroll: float = 10_000.0
max_total_exposure: float = 0.25 # max 25% of bankroll at risk
def add_action(self, action: ConditionalAction) -> "ExecutionPlan":
self.actions.append(action)
return self
def evaluate(self, market_data: Dict[str, Any]) -> List[ConditionalAction]:
"""Evaluate all actions against current market data.
Returns list of actions whose conditions are met.
"""
now = market_data.get("timestamp", time.time())
if self.valid_until and now > self.valid_until:
return [] # plan expired
fired_ids = set()
pending = sorted(self.actions, key=lambda a: -a.priority)
triggered = []
for action in pending:
# Check expiration
if action.expires_at and now > action.expires_at:
continue
# Check dependency
if action.depends_on and action.depends_on not in fired_ids:
continue
# Check cancellation
if action.cancel_if and action.cancel_if in fired_ids:
continue
# Evaluate conditions
if action.conditions.evaluate(market_data):
triggered.append(action)
fired_ids.add(action.action_id)
return triggered
def to_dict(self) -> Dict[str, Any]:
return {
"plan_id": self.plan_id,
"name": self.name,
"description": self.description,
"created_at": self.created_at,
"valid_until": self.valid_until,
"check_interval_seconds": self.check_interval_seconds,
"bankroll": self.bankroll,
"max_exposure": self.max_total_exposure,
"actions": [a.to_dict() for a in self.actions],
}
def to_json(self, indent: int = 2) -> str:
return json.dumps(self.to_dict(), indent=indent, default=str)
def to_human_readable(self) -> str:
lines = [
f"EXECUTION PLAN: {self.name}",
f"{'=' * 60}",
f"ID: {self.plan_id}",
f"Bankroll: ${self.bankroll:,.0f} | Max exposure: {self.max_total_exposure:.0%}",
f"Check every: {self.check_interval_seconds:.0f}s",
"",
]
for i, action in enumerate(self.actions, 1):
lines.append(f"--- Action {i} ---")
lines.append(action.to_human_readable())
lines.append("")
return "\n".join(lines)
# ══════════════════════════════════════════════════════════════════════════════
# 5. PLAN BUILDER β€” fluent API for constructing plans
# ══════════════════════════════════════════════════════════════════════════════
class ExecutionPlanBuilder:
"""Fluent builder for constructing execution plans.
Usage:
plan = (ExecutionPlanBuilder("Election Trade")
.bankroll(10000)
.when_price_above("election_2026", 0.70)
.then_sell_yes(size=0.05, reason="Take profit at 70%")
.when_price_below("election_2026", 0.45)
.then_buy_yes(size=0.03, reason="Dip buy below 45%")
.when_kk_phase_transition("election_2026")
.then_cash_out(reason="Phase transition detected")
.expires_in_hours(24)
.build())
"""
def __init__(self, name: str):
self._plan = ExecutionPlan(name=name)
self._current_conditions: List[TriggerCondition] = []
self._current_market: str = ""
self._current_logic: CombineLogic = CombineLogic.AND
def bankroll(self, amount: float) -> "ExecutionPlanBuilder":
self._plan.bankroll = amount
return self
def max_exposure(self, fraction: float) -> "ExecutionPlanBuilder":
self._plan.max_total_exposure = fraction
return self
def check_every(self, seconds: float) -> "ExecutionPlanBuilder":
self._plan.check_interval_seconds = seconds
return self
def expires_in_hours(self, hours: float) -> "ExecutionPlanBuilder":
self._plan.valid_until = time.time() + hours * 3600
return self
def expires_at(self, timestamp: float) -> "ExecutionPlanBuilder":
self._plan.valid_until = timestamp
return self
# ── Trigger conditions ──
def when_price_above(self, market_id: str, price: float) -> "ExecutionPlanBuilder":
self._current_market = market_id
self._current_conditions = [TriggerCondition(TriggerType.PRICE_ABOVE, price, market_id)]
return self
def when_price_below(self, market_id: str, price: float) -> "ExecutionPlanBuilder":
self._current_market = market_id
self._current_conditions = [TriggerCondition(TriggerType.PRICE_BELOW, price, market_id)]
return self
def when_arb_detected(self, market_id: str, min_edge: float = 0.02) -> "ExecutionPlanBuilder":
self._current_market = market_id
self._current_conditions = [TriggerCondition(TriggerType.ARB_DETECTED, min_edge, market_id)]
return self
def when_kk_phase_transition(self, market_id: str) -> "ExecutionPlanBuilder":
self._current_market = market_id
self._current_conditions = [TriggerCondition(TriggerType.KK_PHASE_TRANSITION, PHI, market_id)]
return self
def when_momentum_exhausted(self, market_id: str) -> "ExecutionPlanBuilder":
self._current_market = market_id
self._current_conditions = [TriggerCondition(TriggerType.MOMENTUM_EXHAUSTED, PHI, market_id)]
return self
def when_whale_move(self, market_id: str, min_size: float = 50_000) -> "ExecutionPlanBuilder":
self._current_market = market_id
self._current_conditions = [TriggerCondition(TriggerType.WHALE_MOVE, min_size, market_id)]
return self
def when_volume_spike(self, market_id: str, ratio: float = 3.0) -> "ExecutionPlanBuilder":
self._current_market = market_id
self._current_conditions = [TriggerCondition(TriggerType.VOLUME_SPIKE, ratio, market_id)]
return self
def when_sentiment_shift(self, market_id: str, delta: float = 0.3) -> "ExecutionPlanBuilder":
self._current_market = market_id
self._current_conditions = [TriggerCondition(TriggerType.SENTIMENT_SHIFT, delta, market_id)]
return self
def and_also(self, trigger_type: TriggerType, threshold: float = 0.0) -> "ExecutionPlanBuilder":
self._current_conditions.append(
TriggerCondition(trigger_type, threshold, self._current_market))
self._current_logic = CombineLogic.AND
return self
def or_else(self, trigger_type: TriggerType, threshold: float = 0.0) -> "ExecutionPlanBuilder":
self._current_conditions.append(
TriggerCondition(trigger_type, threshold, self._current_market))
self._current_logic = CombineLogic.OR
return self
# ── Actions ──
def _add_action(self, action_type: ActionType, size: float = 0.01,
max_dollars: float = 1000.0, limit_price: Optional[float] = None,
reason: str = "") -> "ExecutionPlanBuilder":
action = ConditionalAction(
action_type=action_type,
conditions=ConditionGroup(list(self._current_conditions), self._current_logic),
market_id=self._current_market,
size_fraction=size,
max_size_dollars=max_dollars,
limit_price=limit_price,
causal_reason=reason,
priority=len(self._plan.actions),
)
self._plan.add_action(action)
self._current_conditions = []
return self
def then_buy_yes(self, size: float = 0.01, reason: str = "") -> "ExecutionPlanBuilder":
return self._add_action(ActionType.BUY_YES, size, reason=reason)
def then_buy_no(self, size: float = 0.01, reason: str = "") -> "ExecutionPlanBuilder":
return self._add_action(ActionType.BUY_NO, size, reason=reason)
def then_sell_yes(self, size: float = 0.01, reason: str = "") -> "ExecutionPlanBuilder":
return self._add_action(ActionType.SELL_YES, size, reason=reason)
def then_sell_no(self, size: float = 0.01, reason: str = "") -> "ExecutionPlanBuilder":
return self._add_action(ActionType.SELL_NO, size, reason=reason)
def then_cash_out(self, reason: str = "") -> "ExecutionPlanBuilder":
return self._add_action(ActionType.CASH_OUT, size=1.0, reason=reason)
def then_vote_yes(self, reason: str = "") -> "ExecutionPlanBuilder":
return self._add_action(ActionType.VOTE_YES, reason=reason)
def then_vote_no(self, reason: str = "") -> "ExecutionPlanBuilder":
return self._add_action(ActionType.VOTE_NO, reason=reason)
def then_hedge(self, size: float = 0.01, reason: str = "") -> "ExecutionPlanBuilder":
return self._add_action(ActionType.HEDGE, size, reason=reason)
def then_stop_loss(self, size: float = 1.0, reason: str = "") -> "ExecutionPlanBuilder":
return self._add_action(ActionType.STOP_LOSS, size, reason=reason)
def then_take_profit(self, size: float = 0.5, reason: str = "") -> "ExecutionPlanBuilder":
return self._add_action(ActionType.TAKE_PROFIT, size, reason=reason)
def build(self) -> ExecutionPlan:
return self._plan
# ══════════════════════════════════════════════════════════════════════════════
# 6. PLAN EXECUTION TRACKER β€” state machine for plan lifecycle
# ══════════════════════════════════════════════════════════════════════════════
@dataclass
class PlanExecutionLog:
"""Log entry for a plan action that fired."""
action_id: str
action_type: str
market_id: str
timestamp: float
market_data: Dict[str, Any]
size_dollars: float
reason: str
class PlanExecutionTracker:
"""Tracks plan lifecycle: pending β†’ monitoring β†’ triggered β†’ executed β†’ expired."""
def __init__(self):
self._plans: Dict[str, ExecutionPlan] = {}
self._execution_log: List[PlanExecutionLog] = []
def register_plan(self, plan: ExecutionPlan) -> None:
self._plans[plan.plan_id] = plan
def tick(self, market_data: Dict[str, Any]) -> List[PlanExecutionLog]:
"""Evaluate all registered plans against current market data."""
logs = []
for plan in list(self._plans.values()):
triggered = plan.evaluate(market_data)
for action in triggered:
size = action.size_fraction * plan.bankroll
size = min(size, action.max_size_dollars)
log = PlanExecutionLog(
action_id=action.action_id,
action_type=action.action_type.value,
market_id=action.market_id,
timestamp=time.time(),
market_data=market_data,
size_dollars=size,
reason=action.causal_reason,
)
logs.append(log)
self._execution_log.append(log)
return logs
def remove_plan(self, plan_id: str) -> None:
self._plans.pop(plan_id, None)
@property
def active_plans(self) -> int:
return len(self._plans)
@property
def execution_history(self) -> List[PlanExecutionLog]:
return self._execution_log
# ══════════════════════════════════════════════════════════════════════════════
# 7. EXAMPLE PLAN GENERATOR β€” creates plans from market analysis
# ══════════════════════════════════════════════════════════════════════════════
def generate_arb_plan(market_id: str, yes_venue: str, no_venue: str,
yes_price: float, no_price: float,
bankroll: float = 10_000) -> ExecutionPlan:
"""Generate a cross-venue arbitrage execution plan."""
edge = 1.0 - yes_price - no_price
return (ExecutionPlanBuilder(f"Arb: {market_id}")
.bankroll(bankroll)
.when_price_below(market_id, yes_price + 0.02)
.then_buy_yes(size=0.03, reason=f"Cross-venue arb edge {edge:.1%}")
.when_price_below(market_id, no_price + 0.02)
.then_buy_no(size=0.03, reason=f"Hedge NO side, lock in {edge:.1%}")
.when_price_above(market_id, yes_price + 0.10)
.then_sell_yes(size=0.03, reason="Take profit on YES leg")
.when_kk_phase_transition(market_id)
.then_cash_out(reason="Phase transition β€” exit all")
.expires_in_hours(4)
.build())
def generate_momentum_plan(market_id: str, current_price: float,
trend_direction: str = "up",
bankroll: float = 10_000) -> ExecutionPlan:
"""Generate a momentum-following execution plan."""
builder = ExecutionPlanBuilder(f"Momentum: {market_id}").bankroll(bankroll)
if trend_direction == "up":
builder.when_price_above(market_id, current_price + 0.05)
builder.then_buy_yes(size=0.02, reason="Momentum continuation")
builder.when_price_above(market_id, current_price + 0.15)
builder.then_take_profit(size=0.5, reason="Take profit at +15%")
builder.when_price_below(market_id, current_price - 0.05)
builder.then_stop_loss(reason="Stop loss at -5%")
else:
builder.when_price_below(market_id, current_price - 0.05)
builder.then_buy_no(size=0.02, reason="Downward momentum")
builder.when_price_below(market_id, current_price - 0.15)
builder.then_take_profit(size=0.5, reason="Take profit at -15%")
builder.when_price_above(market_id, current_price + 0.05)
builder.then_stop_loss(reason="Stop loss at +5%")
builder.when_momentum_exhausted(market_id)
builder.then_cash_out(reason="KK phase > Ο† β€” momentum exhausted")
builder.expires_in_hours(8)
return builder.build()
def generate_event_plan(market_id: str, event_time: float,
bankroll: float = 10_000) -> ExecutionPlan:
"""Generate a pre-event / post-event execution plan."""
pre_event = event_time - 3600 # 1 hour before
return (ExecutionPlanBuilder(f"Event: {market_id}")
.bankroll(bankroll)
# Pre-event: build position
.when_sentiment_shift(market_id, delta=0.3)
.then_buy_yes(size=0.02, reason="Strong positive sentiment pre-event")
.when_volume_spike(market_id, ratio=5.0)
.then_buy_yes(size=0.01, reason="Volume surge indicates insider knowledge")
# During event: scale
.when_price_above(market_id, 0.75)
.then_take_profit(size=0.5, reason="High confidence β€” lock in gains")
.when_price_below(market_id, 0.25)
.then_stop_loss(reason="Event went against us")
# Post-event: exit
.when_kk_phase_transition(market_id)
.then_cash_out(reason="Post-event phase transition")
.expires_in_hours(12)
.build())