File size: 10,485 Bytes
eeeb58e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
"""
Trading engine with scenario management and portfolio tracking.
Handles game state, decision processing, and outcome calculation.
"""

import random
import uuid
from datetime import datetime
from typing import List, Optional, Dict, Any, Tuple
from dataclasses import dataclass, field

from config import (
    Scenario, SCENARIOS, ExperimentConfig, DEFAULT_CONFIG,
    ResearcherControlledParams, ParticipantVisibleParams
)


@dataclass
class Portfolio:
    """Represents a participant's portfolio state."""
    cash: float
    initial_value: float
    positions: Dict[str, Dict[str, Any]] = field(default_factory=dict)
    history: List[Dict[str, Any]] = field(default_factory=list)

    @property
    def total_value(self) -> float:
        """Calculate total portfolio value (cash + positions)."""
        position_value = sum(
            pos["shares"] * pos["current_price"]
            for pos in self.positions.values()
        )
        return self.cash + position_value

    @property
    def return_percentage(self) -> float:
        """Calculate portfolio return as percentage."""
        if self.initial_value == 0:
            return 0
        return ((self.total_value - self.initial_value) / self.initial_value) * 100

    def record_state(self, scenario_id: str, action: str, outcome: float):
        """Record a portfolio state change."""
        self.history.append({
            "timestamp": datetime.now().isoformat(),
            "scenario_id": scenario_id,
            "action": action,
            "outcome_pct": outcome,
            "portfolio_value": self.total_value
        })


@dataclass
class DecisionOutcome:
    """Result of a trading decision."""
    scenario_id: str
    decision: str  # "BUY", "SELL", "HOLD"
    outcome_percentage: float
    outcome_amount: float
    portfolio_before: float
    portfolio_after: float
    ai_was_correct: bool
    followed_ai: bool
    was_optimal: bool


class ScenarioManager:
    """Manages scenario selection and ordering for experiments."""

    def __init__(self, config: ExperimentConfig = DEFAULT_CONFIG):
        self.config = config
        self.all_scenarios = SCENARIOS.copy()
        self.session_scenarios: List[Scenario] = []
        self.current_index: int = 0

    def initialize_session(self, shuffle: bool = True) -> List[Scenario]:
        """
        Initialize scenarios for a new session.
        Returns the list of scenarios that will be presented.
        """
        # Select scenarios for this session
        num_scenarios = min(self.config.scenarios_per_session, len(self.all_scenarios))
        self.session_scenarios = self.all_scenarios[:num_scenarios]

        if shuffle:
            random.shuffle(self.session_scenarios)

        self.current_index = 0
        return self.session_scenarios

    def get_current_scenario(self) -> Optional[Scenario]:
        """Get the current scenario."""
        if self.current_index < len(self.session_scenarios):
            return self.session_scenarios[self.current_index]
        return None

    def advance_to_next(self) -> Optional[Scenario]:
        """Move to the next scenario and return it."""
        self.current_index += 1
        return self.get_current_scenario()

    def get_progress(self) -> Tuple[int, int]:
        """Return (current_number, total) for progress display."""
        return (self.current_index + 1, len(self.session_scenarios))

    def is_complete(self) -> bool:
        """Check if all scenarios have been completed."""
        return self.current_index >= len(self.session_scenarios)

    def reset(self):
        """Reset the scenario manager."""
        self.session_scenarios = []
        self.current_index = 0


class TradingEngine:
    """
    Main trading engine that processes decisions and manages game state.
    """

    def __init__(self, config: ExperimentConfig = DEFAULT_CONFIG):
        self.config = config
        self.portfolio: Optional[Portfolio] = None
        self.scenario_manager = ScenarioManager(config)
        self.decisions_made: List[DecisionOutcome] = []

    def start_new_game(self) -> Tuple[Portfolio, Scenario]:
        """
        Start a new trading game.
        Returns the initial portfolio and first scenario.
        """
        # Initialize portfolio
        self.portfolio = Portfolio(
            cash=self.config.initial_portfolio_value,
            initial_value=self.config.initial_portfolio_value
        )

        # Initialize scenarios
        self.scenario_manager.initialize_session(shuffle=True)
        self.decisions_made = []

        first_scenario = self.scenario_manager.get_current_scenario()
        return self.portfolio, first_scenario

    def process_decision(
        self,
        scenario: Scenario,
        decision: str,  # "BUY", "SELL", "HOLD"
        trade_amount: float,
        ai_recommendation: str
    ) -> DecisionOutcome:
        """
        Process a trading decision and return the outcome.
        """
        # Validate decision
        decision = decision.upper()
        if decision not in ["BUY", "SELL", "HOLD"]:
            raise ValueError(f"Invalid decision: {decision}")

        # Get outcome percentage based on decision
        outcome_map = {
            "BUY": scenario.outcome_buy,
            "SELL": scenario.outcome_sell,
            "HOLD": scenario.outcome_hold
        }
        outcome_pct = outcome_map[decision]

        # Calculate outcome amount
        portfolio_before = self.portfolio.total_value

        # For simplicity, we apply the outcome to the trade amount
        # In a more complex system, you might track actual share positions
        if decision in ["BUY", "HOLD"]:
            # Participant is exposed to the stock's movement
            outcome_amount = trade_amount * outcome_pct
        else:  # SELL
            # Participant avoided the stock's movement (inverse)
            outcome_amount = trade_amount * outcome_pct

        # Update portfolio
        self.portfolio.cash += outcome_amount
        portfolio_after = self.portfolio.total_value

        # Record state
        self.portfolio.record_state(scenario.scenario_id, decision, outcome_pct)

        # Determine if AI was followed and if decision was optimal
        followed_ai = (decision == ai_recommendation)
        was_optimal = (decision == scenario.optimal_action)

        outcome = DecisionOutcome(
            scenario_id=scenario.scenario_id,
            decision=decision,
            outcome_percentage=outcome_pct,
            outcome_amount=outcome_amount,
            portfolio_before=portfolio_before,
            portfolio_after=portfolio_after,
            ai_was_correct=scenario.ai_is_correct,
            followed_ai=followed_ai,
            was_optimal=was_optimal
        )

        self.decisions_made.append(outcome)
        return outcome

    def get_next_scenario(self) -> Optional[Scenario]:
        """Get the next scenario in the session."""
        return self.scenario_manager.advance_to_next()

    def is_game_complete(self) -> bool:
        """Check if the game is complete."""
        return self.scenario_manager.is_complete()

    def get_game_summary(self) -> Dict[str, Any]:
        """Get a summary of the completed game."""
        if not self.portfolio:
            return {}

        total_decisions = len(self.decisions_made)
        ai_followed_count = sum(1 for d in self.decisions_made if d.followed_ai)
        optimal_count = sum(1 for d in self.decisions_made if d.was_optimal)

        # Calculate when AI was correct vs wrong
        ai_correct_decisions = [d for d in self.decisions_made if d.ai_was_correct]
        ai_wrong_decisions = [d for d in self.decisions_made if not d.ai_was_correct]

        followed_when_correct = sum(1 for d in ai_correct_decisions if d.followed_ai)
        followed_when_wrong = sum(1 for d in ai_wrong_decisions if d.followed_ai)

        return {
            "initial_portfolio": self.portfolio.initial_value,
            "final_portfolio": self.portfolio.total_value,
            "total_return": self.portfolio.total_value - self.portfolio.initial_value,
            "return_percentage": self.portfolio.return_percentage,
            "total_decisions": total_decisions,
            "ai_follow_rate": ai_followed_count / total_decisions if total_decisions > 0 else 0,
            "optimal_decision_rate": optimal_count / total_decisions if total_decisions > 0 else 0,
            "followed_correct_ai": followed_when_correct,
            "followed_incorrect_ai": followed_when_wrong,
            "ai_correct_scenarios": len(ai_correct_decisions),
            "ai_incorrect_scenarios": len(ai_wrong_decisions),
            "decisions": [
                {
                    "scenario": d.scenario_id,
                    "decision": d.decision,
                    "outcome": f"{d.outcome_percentage * 100:+.1f}%",
                    "followed_ai": d.followed_ai,
                    "was_optimal": d.was_optimal
                }
                for d in self.decisions_made
            ]
        }

    def get_progress_info(self) -> Dict[str, Any]:
        """Get current progress information."""
        current, total = self.scenario_manager.get_progress()
        return {
            "current_scenario": current,
            "total_scenarios": total,
            "progress_percentage": (current / total) * 100 if total > 0 else 0,
            "portfolio_value": self.portfolio.total_value if self.portfolio else 0,
            "portfolio_return": self.portfolio.return_percentage if self.portfolio else 0
        }


def calculate_suggested_trade_amount(
    portfolio_value: float,
    risk_level: int = 50
) -> float:
    """
    Calculate a suggested trade amount based on portfolio and risk level.
    """
    # Base: 10-30% of portfolio depending on risk level
    min_pct = 0.10
    max_pct = 0.30
    risk_factor = risk_level / 100

    suggested_pct = min_pct + (max_pct - min_pct) * risk_factor
    return round(portfolio_value * suggested_pct, 2)


def format_currency(amount: float, symbol: str = "credits") -> str:
    """Format a currency amount for display."""
    if amount >= 0:
        return f"{amount:,.2f} {symbol}"
    else:
        return f"-{abs(amount):,.2f} {symbol}"


def format_percentage(value: float, include_sign: bool = True) -> str:
    """Format a percentage for display."""
    if include_sign:
        return f"{value * 100:+.1f}%"
    return f"{value * 100:.1f}%"