Spaces:
Sleeping
Sleeping
| """ | |
| Trading engine with scenario management and portfolio tracking. | |
| Handles game state, decision processing, and outcome calculation. | |
| """ | |
| import random | |
| import uuid | |
| from datetime import datetime | |
| from typing import List, Optional, Dict, Any, Tuple | |
| from dataclasses import dataclass, field | |
| from config import ( | |
| Scenario, SCENARIOS, ExperimentConfig, DEFAULT_CONFIG, | |
| ResearcherControlledParams, ParticipantVisibleParams | |
| ) | |
| class Portfolio: | |
| """Represents a participant's portfolio state.""" | |
| cash: float | |
| initial_value: float | |
| positions: Dict[str, Dict[str, Any]] = field(default_factory=dict) | |
| history: List[Dict[str, Any]] = field(default_factory=list) | |
| def total_value(self) -> float: | |
| """Calculate total portfolio value (cash + positions).""" | |
| position_value = sum( | |
| pos["shares"] * pos["current_price"] | |
| for pos in self.positions.values() | |
| ) | |
| return self.cash + position_value | |
| def return_percentage(self) -> float: | |
| """Calculate portfolio return as percentage.""" | |
| if self.initial_value == 0: | |
| return 0 | |
| return ((self.total_value - self.initial_value) / self.initial_value) * 100 | |
| def record_state(self, scenario_id: str, action: str, outcome: float): | |
| """Record a portfolio state change.""" | |
| self.history.append({ | |
| "timestamp": datetime.now().isoformat(), | |
| "scenario_id": scenario_id, | |
| "action": action, | |
| "outcome_pct": outcome, | |
| "portfolio_value": self.total_value | |
| }) | |
| class DecisionOutcome: | |
| """Result of a trading decision.""" | |
| scenario_id: str | |
| decision: str # "BUY", "SELL", "HOLD" | |
| outcome_percentage: float | |
| outcome_amount: float | |
| portfolio_before: float | |
| portfolio_after: float | |
| ai_was_correct: bool | |
| followed_ai: bool | |
| was_optimal: bool | |
| class ScenarioManager: | |
| """Manages scenario selection and ordering for experiments.""" | |
| def __init__(self, config: ExperimentConfig = DEFAULT_CONFIG): | |
| self.config = config | |
| self.all_scenarios = SCENARIOS.copy() | |
| self.session_scenarios: List[Scenario] = [] | |
| self.current_index: int = 0 | |
| def initialize_session(self, shuffle: bool = True) -> List[Scenario]: | |
| """ | |
| Initialize scenarios for a new session. | |
| Returns the list of scenarios that will be presented. | |
| """ | |
| # Select scenarios for this session | |
| num_scenarios = min(self.config.scenarios_per_session, len(self.all_scenarios)) | |
| self.session_scenarios = self.all_scenarios[:num_scenarios] | |
| if shuffle: | |
| random.shuffle(self.session_scenarios) | |
| self.current_index = 0 | |
| return self.session_scenarios | |
| def get_current_scenario(self) -> Optional[Scenario]: | |
| """Get the current scenario.""" | |
| if self.current_index < len(self.session_scenarios): | |
| return self.session_scenarios[self.current_index] | |
| return None | |
| def advance_to_next(self) -> Optional[Scenario]: | |
| """Move to the next scenario and return it.""" | |
| self.current_index += 1 | |
| return self.get_current_scenario() | |
| def get_progress(self) -> Tuple[int, int]: | |
| """Return (current_number, total) for progress display.""" | |
| return (self.current_index + 1, len(self.session_scenarios)) | |
| def is_complete(self) -> bool: | |
| """Check if all scenarios have been completed.""" | |
| return self.current_index >= len(self.session_scenarios) | |
| def reset(self): | |
| """Reset the scenario manager.""" | |
| self.session_scenarios = [] | |
| self.current_index = 0 | |
| class TradingEngine: | |
| """ | |
| Main trading engine that processes decisions and manages game state. | |
| """ | |
| def __init__(self, config: ExperimentConfig = DEFAULT_CONFIG): | |
| self.config = config | |
| self.portfolio: Optional[Portfolio] = None | |
| self.scenario_manager = ScenarioManager(config) | |
| self.decisions_made: List[DecisionOutcome] = [] | |
| def start_new_game(self) -> Tuple[Portfolio, Scenario]: | |
| """ | |
| Start a new trading game. | |
| Returns the initial portfolio and first scenario. | |
| """ | |
| # Initialize portfolio | |
| self.portfolio = Portfolio( | |
| cash=self.config.initial_portfolio_value, | |
| initial_value=self.config.initial_portfolio_value | |
| ) | |
| # Initialize scenarios | |
| self.scenario_manager.initialize_session(shuffle=True) | |
| self.decisions_made = [] | |
| first_scenario = self.scenario_manager.get_current_scenario() | |
| return self.portfolio, first_scenario | |
| def process_decision( | |
| self, | |
| scenario: Scenario, | |
| decision: str, # "BUY", "SELL", "HOLD" | |
| trade_amount: float, | |
| ai_recommendation: str | |
| ) -> DecisionOutcome: | |
| """ | |
| Process a trading decision and return the outcome. | |
| """ | |
| # Validate decision | |
| decision = decision.upper() | |
| if decision not in ["BUY", "SELL", "HOLD"]: | |
| raise ValueError(f"Invalid decision: {decision}") | |
| # Get outcome percentage based on decision | |
| outcome_map = { | |
| "BUY": scenario.outcome_buy, | |
| "SELL": scenario.outcome_sell, | |
| "HOLD": scenario.outcome_hold | |
| } | |
| outcome_pct = outcome_map[decision] | |
| # Calculate outcome amount | |
| portfolio_before = self.portfolio.total_value | |
| # For simplicity, we apply the outcome to the trade amount | |
| # In a more complex system, you might track actual share positions | |
| if decision in ["BUY", "HOLD"]: | |
| # Participant is exposed to the stock's movement | |
| outcome_amount = trade_amount * outcome_pct | |
| else: # SELL | |
| # Participant avoided the stock's movement (inverse) | |
| outcome_amount = trade_amount * outcome_pct | |
| # Update portfolio | |
| self.portfolio.cash += outcome_amount | |
| portfolio_after = self.portfolio.total_value | |
| # Record state | |
| self.portfolio.record_state(scenario.scenario_id, decision, outcome_pct) | |
| # Determine if AI was followed and if decision was optimal | |
| followed_ai = (decision == ai_recommendation) | |
| was_optimal = (decision == scenario.optimal_action) | |
| outcome = DecisionOutcome( | |
| scenario_id=scenario.scenario_id, | |
| decision=decision, | |
| outcome_percentage=outcome_pct, | |
| outcome_amount=outcome_amount, | |
| portfolio_before=portfolio_before, | |
| portfolio_after=portfolio_after, | |
| ai_was_correct=scenario.ai_is_correct, | |
| followed_ai=followed_ai, | |
| was_optimal=was_optimal | |
| ) | |
| self.decisions_made.append(outcome) | |
| return outcome | |
| def get_next_scenario(self) -> Optional[Scenario]: | |
| """Get the next scenario in the session.""" | |
| return self.scenario_manager.advance_to_next() | |
| def is_game_complete(self) -> bool: | |
| """Check if the game is complete.""" | |
| return self.scenario_manager.is_complete() | |
| def get_game_summary(self) -> Dict[str, Any]: | |
| """Get a summary of the completed game.""" | |
| if not self.portfolio: | |
| return {} | |
| total_decisions = len(self.decisions_made) | |
| ai_followed_count = sum(1 for d in self.decisions_made if d.followed_ai) | |
| optimal_count = sum(1 for d in self.decisions_made if d.was_optimal) | |
| # Calculate when AI was correct vs wrong | |
| ai_correct_decisions = [d for d in self.decisions_made if d.ai_was_correct] | |
| ai_wrong_decisions = [d for d in self.decisions_made if not d.ai_was_correct] | |
| followed_when_correct = sum(1 for d in ai_correct_decisions if d.followed_ai) | |
| followed_when_wrong = sum(1 for d in ai_wrong_decisions if d.followed_ai) | |
| return { | |
| "initial_portfolio": self.portfolio.initial_value, | |
| "final_portfolio": self.portfolio.total_value, | |
| "total_return": self.portfolio.total_value - self.portfolio.initial_value, | |
| "return_percentage": self.portfolio.return_percentage, | |
| "total_decisions": total_decisions, | |
| "ai_follow_rate": ai_followed_count / total_decisions if total_decisions > 0 else 0, | |
| "optimal_decision_rate": optimal_count / total_decisions if total_decisions > 0 else 0, | |
| "followed_correct_ai": followed_when_correct, | |
| "followed_incorrect_ai": followed_when_wrong, | |
| "ai_correct_scenarios": len(ai_correct_decisions), | |
| "ai_incorrect_scenarios": len(ai_wrong_decisions), | |
| "decisions": [ | |
| { | |
| "scenario": d.scenario_id, | |
| "decision": d.decision, | |
| "outcome": f"{d.outcome_percentage * 100:+.1f}%", | |
| "followed_ai": d.followed_ai, | |
| "was_optimal": d.was_optimal | |
| } | |
| for d in self.decisions_made | |
| ] | |
| } | |
| def get_progress_info(self) -> Dict[str, Any]: | |
| """Get current progress information.""" | |
| current, total = self.scenario_manager.get_progress() | |
| return { | |
| "current_scenario": current, | |
| "total_scenarios": total, | |
| "progress_percentage": (current / total) * 100 if total > 0 else 0, | |
| "portfolio_value": self.portfolio.total_value if self.portfolio else 0, | |
| "portfolio_return": self.portfolio.return_percentage if self.portfolio else 0 | |
| } | |
| def calculate_suggested_trade_amount( | |
| portfolio_value: float, | |
| risk_level: int = 50 | |
| ) -> float: | |
| """ | |
| Calculate a suggested trade amount based on portfolio and risk level. | |
| """ | |
| # Base: 10-30% of portfolio depending on risk level | |
| min_pct = 0.10 | |
| max_pct = 0.30 | |
| risk_factor = risk_level / 100 | |
| suggested_pct = min_pct + (max_pct - min_pct) * risk_factor | |
| return round(portfolio_value * suggested_pct, 2) | |
| def format_currency(amount: float, symbol: str = "credits") -> str: | |
| """Format a currency amount for display.""" | |
| if amount >= 0: | |
| return f"{amount:,.2f} {symbol}" | |
| else: | |
| return f"-{abs(amount):,.2f} {symbol}" | |
| def format_percentage(value: float, include_sign: bool = True) -> str: | |
| """Format a percentage for display.""" | |
| if include_sign: | |
| return f"{value * 100:+.1f}%" | |
| return f"{value * 100:.1f}%" | |