JoshTest2 / trading.py
LittleMonkeyLab's picture
Upload folder using huggingface_hub
eeeb58e verified
"""
Trading engine with scenario management and portfolio tracking.
Handles game state, decision processing, and outcome calculation.
"""
import random
import uuid
from datetime import datetime
from typing import List, Optional, Dict, Any, Tuple
from dataclasses import dataclass, field
from config import (
Scenario, SCENARIOS, ExperimentConfig, DEFAULT_CONFIG,
ResearcherControlledParams, ParticipantVisibleParams
)
@dataclass
class Portfolio:
"""Represents a participant's portfolio state."""
cash: float
initial_value: float
positions: Dict[str, Dict[str, Any]] = field(default_factory=dict)
history: List[Dict[str, Any]] = field(default_factory=list)
@property
def total_value(self) -> float:
"""Calculate total portfolio value (cash + positions)."""
position_value = sum(
pos["shares"] * pos["current_price"]
for pos in self.positions.values()
)
return self.cash + position_value
@property
def return_percentage(self) -> float:
"""Calculate portfolio return as percentage."""
if self.initial_value == 0:
return 0
return ((self.total_value - self.initial_value) / self.initial_value) * 100
def record_state(self, scenario_id: str, action: str, outcome: float):
"""Record a portfolio state change."""
self.history.append({
"timestamp": datetime.now().isoformat(),
"scenario_id": scenario_id,
"action": action,
"outcome_pct": outcome,
"portfolio_value": self.total_value
})
@dataclass
class DecisionOutcome:
"""Result of a trading decision."""
scenario_id: str
decision: str # "BUY", "SELL", "HOLD"
outcome_percentage: float
outcome_amount: float
portfolio_before: float
portfolio_after: float
ai_was_correct: bool
followed_ai: bool
was_optimal: bool
class ScenarioManager:
"""Manages scenario selection and ordering for experiments."""
def __init__(self, config: ExperimentConfig = DEFAULT_CONFIG):
self.config = config
self.all_scenarios = SCENARIOS.copy()
self.session_scenarios: List[Scenario] = []
self.current_index: int = 0
def initialize_session(self, shuffle: bool = True) -> List[Scenario]:
"""
Initialize scenarios for a new session.
Returns the list of scenarios that will be presented.
"""
# Select scenarios for this session
num_scenarios = min(self.config.scenarios_per_session, len(self.all_scenarios))
self.session_scenarios = self.all_scenarios[:num_scenarios]
if shuffle:
random.shuffle(self.session_scenarios)
self.current_index = 0
return self.session_scenarios
def get_current_scenario(self) -> Optional[Scenario]:
"""Get the current scenario."""
if self.current_index < len(self.session_scenarios):
return self.session_scenarios[self.current_index]
return None
def advance_to_next(self) -> Optional[Scenario]:
"""Move to the next scenario and return it."""
self.current_index += 1
return self.get_current_scenario()
def get_progress(self) -> Tuple[int, int]:
"""Return (current_number, total) for progress display."""
return (self.current_index + 1, len(self.session_scenarios))
def is_complete(self) -> bool:
"""Check if all scenarios have been completed."""
return self.current_index >= len(self.session_scenarios)
def reset(self):
"""Reset the scenario manager."""
self.session_scenarios = []
self.current_index = 0
class TradingEngine:
"""
Main trading engine that processes decisions and manages game state.
"""
def __init__(self, config: ExperimentConfig = DEFAULT_CONFIG):
self.config = config
self.portfolio: Optional[Portfolio] = None
self.scenario_manager = ScenarioManager(config)
self.decisions_made: List[DecisionOutcome] = []
def start_new_game(self) -> Tuple[Portfolio, Scenario]:
"""
Start a new trading game.
Returns the initial portfolio and first scenario.
"""
# Initialize portfolio
self.portfolio = Portfolio(
cash=self.config.initial_portfolio_value,
initial_value=self.config.initial_portfolio_value
)
# Initialize scenarios
self.scenario_manager.initialize_session(shuffle=True)
self.decisions_made = []
first_scenario = self.scenario_manager.get_current_scenario()
return self.portfolio, first_scenario
def process_decision(
self,
scenario: Scenario,
decision: str, # "BUY", "SELL", "HOLD"
trade_amount: float,
ai_recommendation: str
) -> DecisionOutcome:
"""
Process a trading decision and return the outcome.
"""
# Validate decision
decision = decision.upper()
if decision not in ["BUY", "SELL", "HOLD"]:
raise ValueError(f"Invalid decision: {decision}")
# Get outcome percentage based on decision
outcome_map = {
"BUY": scenario.outcome_buy,
"SELL": scenario.outcome_sell,
"HOLD": scenario.outcome_hold
}
outcome_pct = outcome_map[decision]
# Calculate outcome amount
portfolio_before = self.portfolio.total_value
# For simplicity, we apply the outcome to the trade amount
# In a more complex system, you might track actual share positions
if decision in ["BUY", "HOLD"]:
# Participant is exposed to the stock's movement
outcome_amount = trade_amount * outcome_pct
else: # SELL
# Participant avoided the stock's movement (inverse)
outcome_amount = trade_amount * outcome_pct
# Update portfolio
self.portfolio.cash += outcome_amount
portfolio_after = self.portfolio.total_value
# Record state
self.portfolio.record_state(scenario.scenario_id, decision, outcome_pct)
# Determine if AI was followed and if decision was optimal
followed_ai = (decision == ai_recommendation)
was_optimal = (decision == scenario.optimal_action)
outcome = DecisionOutcome(
scenario_id=scenario.scenario_id,
decision=decision,
outcome_percentage=outcome_pct,
outcome_amount=outcome_amount,
portfolio_before=portfolio_before,
portfolio_after=portfolio_after,
ai_was_correct=scenario.ai_is_correct,
followed_ai=followed_ai,
was_optimal=was_optimal
)
self.decisions_made.append(outcome)
return outcome
def get_next_scenario(self) -> Optional[Scenario]:
"""Get the next scenario in the session."""
return self.scenario_manager.advance_to_next()
def is_game_complete(self) -> bool:
"""Check if the game is complete."""
return self.scenario_manager.is_complete()
def get_game_summary(self) -> Dict[str, Any]:
"""Get a summary of the completed game."""
if not self.portfolio:
return {}
total_decisions = len(self.decisions_made)
ai_followed_count = sum(1 for d in self.decisions_made if d.followed_ai)
optimal_count = sum(1 for d in self.decisions_made if d.was_optimal)
# Calculate when AI was correct vs wrong
ai_correct_decisions = [d for d in self.decisions_made if d.ai_was_correct]
ai_wrong_decisions = [d for d in self.decisions_made if not d.ai_was_correct]
followed_when_correct = sum(1 for d in ai_correct_decisions if d.followed_ai)
followed_when_wrong = sum(1 for d in ai_wrong_decisions if d.followed_ai)
return {
"initial_portfolio": self.portfolio.initial_value,
"final_portfolio": self.portfolio.total_value,
"total_return": self.portfolio.total_value - self.portfolio.initial_value,
"return_percentage": self.portfolio.return_percentage,
"total_decisions": total_decisions,
"ai_follow_rate": ai_followed_count / total_decisions if total_decisions > 0 else 0,
"optimal_decision_rate": optimal_count / total_decisions if total_decisions > 0 else 0,
"followed_correct_ai": followed_when_correct,
"followed_incorrect_ai": followed_when_wrong,
"ai_correct_scenarios": len(ai_correct_decisions),
"ai_incorrect_scenarios": len(ai_wrong_decisions),
"decisions": [
{
"scenario": d.scenario_id,
"decision": d.decision,
"outcome": f"{d.outcome_percentage * 100:+.1f}%",
"followed_ai": d.followed_ai,
"was_optimal": d.was_optimal
}
for d in self.decisions_made
]
}
def get_progress_info(self) -> Dict[str, Any]:
"""Get current progress information."""
current, total = self.scenario_manager.get_progress()
return {
"current_scenario": current,
"total_scenarios": total,
"progress_percentage": (current / total) * 100 if total > 0 else 0,
"portfolio_value": self.portfolio.total_value if self.portfolio else 0,
"portfolio_return": self.portfolio.return_percentage if self.portfolio else 0
}
def calculate_suggested_trade_amount(
portfolio_value: float,
risk_level: int = 50
) -> float:
"""
Calculate a suggested trade amount based on portfolio and risk level.
"""
# Base: 10-30% of portfolio depending on risk level
min_pct = 0.10
max_pct = 0.30
risk_factor = risk_level / 100
suggested_pct = min_pct + (max_pct - min_pct) * risk_factor
return round(portfolio_value * suggested_pct, 2)
def format_currency(amount: float, symbol: str = "credits") -> str:
"""Format a currency amount for display."""
if amount >= 0:
return f"{amount:,.2f} {symbol}"
else:
return f"-{abs(amount):,.2f} {symbol}"
def format_percentage(value: float, include_sign: bool = True) -> str:
"""Format a percentage for display."""
if include_sign:
return f"{value * 100:+.1f}%"
return f"{value * 100:.1f}%"