| |
| """ |
| Validation script for P&L calculations using real portfolio data. |
| |
| This script loads portfolio data from private-data/, processes the options, |
| and validates P&L calculations for a position group (e.g., SPY options). |
| """ |
|
|
| import logging |
| import os |
| import sys |
| from pathlib import Path |
|
|
| import matplotlib.pyplot as plt |
| import numpy as np |
| import pandas as pd |
|
|
| |
| sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) |
|
|
| from src.folio.pnl import ( |
| calculate_strategy_pnl, |
| determine_price_range, |
| summarize_strategy_pnl, |
| ) |
| from src.folio.portfolio import process_portfolio_data |
|
|
| |
| logging.basicConfig( |
| level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" |
| ) |
| logger = logging.getLogger(__name__) |
|
|
|
|
| def print_position_details(positions): |
| """ |
| Print details of positions for debugging. |
| |
| Args: |
| positions: List of positions |
| """ |
|
|
| for _i, pos in enumerate(positions): |
| position_type = getattr(pos, "position_type", "unknown") |
| if position_type == "stock": |
| pass |
| elif position_type == "option": |
| pass |
| else: |
| pass |
|
|
|
|
| def validate_pnl_for_group(group): |
| """ |
| Validate P&L calculations for a position group. |
| |
| Args: |
| group: Position group |
| |
| Returns: |
| P&L data |
| """ |
| |
| all_positions = [] |
| if group.stock_position: |
| all_positions.append(group.stock_position) |
|
|
| all_positions.extend(group.option_positions) |
|
|
| if not all_positions: |
| logger.warning(f"No positions found in group {group.ticker}") |
| return None |
|
|
| |
| print_position_details(all_positions) |
|
|
| |
| current_price = None |
| if group.stock_position: |
| current_price = group.stock_position.price |
| elif group.option_positions: |
| |
| first_option = group.option_positions[0] |
| if hasattr(first_option, "notional_value") and hasattr( |
| first_option, "quantity" |
| ): |
| |
| abs_quantity = abs(first_option.quantity) |
| if abs_quantity > 0: |
| current_price = first_option.notional_value / (100 * abs_quantity) |
|
|
| if current_price is None: |
| |
| current_price = 100.0 |
| logger.warning( |
| f"Could not determine current price for {group.ticker}, using default: ${current_price:.2f}" |
| ) |
| else: |
| logger.info(f"Using current price for {group.ticker}: ${current_price:.2f}") |
|
|
| |
| price_range = determine_price_range(all_positions, current_price) |
| logger.info(f"Price range: ${price_range[0]:.2f} to ${price_range[1]:.2f}") |
|
|
| |
| pnl_data_default = calculate_strategy_pnl( |
| all_positions, price_range=price_range, num_points=100, use_cost_basis=False |
| ) |
|
|
| |
| summary_default = summarize_strategy_pnl(pnl_data_default, current_price) |
|
|
| |
| pnl_data_cost_basis = calculate_strategy_pnl( |
| all_positions, price_range=price_range, num_points=100, use_cost_basis=True |
| ) |
|
|
| |
| summary_cost_basis = summarize_strategy_pnl(pnl_data_cost_basis, current_price) |
|
|
| |
| return { |
| "default": (pnl_data_default, summary_default), |
| "cost_basis": (pnl_data_cost_basis, summary_cost_basis), |
| }, current_price |
|
|
|
|
| def plot_pnl( |
| pnl_data, summary, current_price, ticker, mode="default", output_dir=".tmp" |
| ): |
| """ |
| Plot P&L data and save to file. |
| |
| Args: |
| pnl_data: P&L data from calculate_strategy_pnl |
| summary: Summary data from summarize_strategy_pnl |
| current_price: Current price of the underlying |
| ticker: Ticker symbol |
| mode: Mode used for P&L calculation ("default" or "cost_basis") |
| output_dir: Directory to save plot |
| """ |
| |
| os.makedirs(output_dir, exist_ok=True) |
|
|
| |
| plt.figure(figsize=(12, 8)) |
|
|
| |
| plt.plot( |
| pnl_data["price_points"], |
| pnl_data["pnl_values"], |
| "b-", |
| linewidth=2, |
| label=f"{ticker} Strategy P&L", |
| ) |
|
|
| |
| if "individual_pnls" in pnl_data: |
| for i, pos_pnl in enumerate(pnl_data["individual_pnls"]): |
| pos_desc = pos_pnl.get("position", {}).get("ticker", f"Position {i + 1}") |
| plt.plot( |
| pos_pnl["price_points"], |
| pos_pnl["pnl_values"], |
| "--", |
| linewidth=1, |
| alpha=0.5, |
| label=pos_desc, |
| ) |
|
|
| |
| plt.axhline(y=0, color="r", linestyle="-", alpha=0.3) |
| plt.axvline( |
| x=current_price, |
| color="g", |
| linestyle="--", |
| alpha=0.5, |
| label=f"Current Price: ${current_price:.2f}", |
| ) |
|
|
| |
| for bp in summary["breakeven_points"]: |
| plt.axvline(x=bp, color="orange", linestyle=":", alpha=0.5) |
| plt.text(bp, 0, f"BE: ${bp:.2f}", rotation=90, verticalalignment="center") |
|
|
| |
| max_profit_price = summary["max_profit_price"] |
| max_profit = summary["max_profit"] |
| plt.plot(max_profit_price, max_profit, "go", markersize=8) |
| plt.text( |
| max_profit_price, |
| max_profit, |
| f"Max Profit: ${max_profit:.2f}", |
| verticalalignment="bottom", |
| horizontalalignment="center", |
| ) |
|
|
| max_loss_price = summary["max_loss_price"] |
| max_loss = summary["max_loss"] |
| plt.plot(max_loss_price, max_loss, "ro", markersize=8) |
| plt.text( |
| max_loss_price, |
| max_loss, |
| f"Max Loss: ${max_loss:.2f}", |
| verticalalignment="top", |
| horizontalalignment="center", |
| ) |
|
|
| |
| current_pnl = summary["current_pnl"] |
| plt.plot(current_price, current_pnl, "yo", markersize=8) |
| plt.text( |
| current_price, |
| current_pnl, |
| f"Current P&L: ${current_pnl:.2f}", |
| verticalalignment="bottom", |
| horizontalalignment="right", |
| ) |
|
|
| |
| mode_label = "Using Cost Basis" if mode == "cost_basis" else "Using Current Price" |
| plt.title(f"P&L Analysis for {ticker} Position Group ({mode_label})") |
| plt.xlabel(f"{ticker} Price") |
| plt.ylabel("P&L ($)") |
| plt.grid(True, alpha=0.3) |
| plt.legend(loc="best") |
|
|
| |
| output_file = os.path.join( |
| output_dir, f"{ticker.lower()}_pnl_validation_{mode}.png" |
| ) |
| plt.savefig(output_file) |
| logger.info(f"P&L plot saved to {output_file}") |
|
|
| |
| plt.close() |
|
|
|
|
| def main(): |
| """ |
| Load portfolio data, process options, and validate P&L calculations. |
| """ |
| |
| private_data_dir = Path("private-data") |
| if not private_data_dir.exists(): |
| logger.error(f"Private data directory not found: {private_data_dir}") |
| return |
|
|
| portfolio_files = list(private_data_dir.glob("pf-*.csv")) |
| if not portfolio_files: |
| logger.error(f"No portfolio files found in {private_data_dir}") |
| return |
|
|
| |
| portfolio_files.sort(reverse=True) |
| portfolio_file = portfolio_files[0] |
| logger.info(f"Using portfolio file: {portfolio_file}") |
|
|
| |
| df = pd.read_csv(portfolio_file) |
| logger.info(f"Loaded portfolio data with {len(df)} positions") |
|
|
| |
| try: |
| |
| result = process_portfolio_data(df) |
| if isinstance(result, tuple) and len(result) >= 2: |
| groups, summary = result[0], result[1] |
| logger.info( |
| f"Portfolio data processed successfully with {len(groups)} groups" |
| ) |
| else: |
| logger.error("Unexpected result format from process_portfolio_data") |
| return |
| except Exception as e: |
| logger.error(f"Error processing portfolio data: {e}") |
| return |
|
|
| |
| tickers_to_analyze = ["META", "AMZN", "GOOGL", "NVDA", "QQQ", "SPY"] |
|
|
| found_group = False |
| for ticker in tickers_to_analyze: |
| |
| matching_groups = [g for g in groups if g.ticker == ticker] |
|
|
| if not matching_groups: |
| logger.warning(f"No {ticker} position group found in portfolio") |
| continue |
|
|
| group = matching_groups[0] |
| found_group = True |
|
|
| logger.info(f"Found {ticker} position group") |
| if group.option_positions: |
| logger.info(f" Option positions: {len(group.option_positions)}") |
| if group.stock_position: |
| logger.info(f" Stock position: {group.stock_position.quantity} shares") |
|
|
| |
| try: |
| pnl_results, current_price = validate_pnl_for_group(group) |
| if pnl_results: |
| |
| pnl_data, summary = pnl_results["default"] |
|
|
| |
|
|
| |
| for key in pnl_data.keys(): |
| if key == "individual_pnls": |
| for _i, _pos_pnl in enumerate(pnl_data[key]): |
| pass |
|
|
| |
| max_profit_price = summary["max_profit_price"] |
| max_profit_idx = 0 |
| for i, price in enumerate(pnl_data["price_points"]): |
| if abs(price - max_profit_price) < 0.01: |
| max_profit_idx = i |
| break |
|
|
| total_pnl = 0 |
| for i, pos_pnl in enumerate(pnl_data["individual_pnls"]): |
| pos_desc = pos_pnl.get("position", {}).get( |
| "ticker", f"Position {i + 1}" |
| ) |
| pos_type = pos_pnl.get("position", {}).get( |
| "position_type", "unknown" |
| ) |
| pos_pnl.get("position", {}).get("quantity", 0) |
|
|
| if pos_type == "option": |
| option_type = pos_pnl.get("position", {}).get("option_type", "") |
| strike = pos_pnl.get("position", {}).get("strike", 0) |
| pos_desc = f"{pos_desc} {option_type} {strike}" |
|
|
| pnl_at_max = pos_pnl["pnl_values"][max_profit_idx] |
| total_pnl += pnl_at_max |
|
|
| |
| |
| price_points = np.array(pnl_data["price_points"]) |
| current_idx = np.abs(price_points - current_price).argmin() |
|
|
| total_current_pnl = 0 |
| for i, pos_pnl in enumerate(pnl_data["individual_pnls"]): |
| pos_desc = pos_pnl.get("position", {}).get( |
| "ticker", f"Position {i + 1}" |
| ) |
| pos_type = pos_pnl.get("position", {}).get( |
| "position_type", "unknown" |
| ) |
| pos_pnl.get("position", {}).get("quantity", 0) |
|
|
| if pos_type == "option": |
| option_type = pos_pnl.get("position", {}).get("option_type", "") |
| strike = pos_pnl.get("position", {}).get("strike", 0) |
| pos_desc = f"{pos_desc} {option_type} {strike}" |
|
|
| pnl_at_current = pos_pnl["pnl_values"][current_idx] |
| total_current_pnl += pnl_at_current |
|
|
| |
|
|
| |
| pnl_data_cost_basis, summary_cost_basis = pnl_results["cost_basis"] |
|
|
| |
| for i, pos_pnl in enumerate(pnl_data["individual_pnls"]): |
| pos_desc = pos_pnl.get("position", {}).get( |
| "ticker", f"Position {i + 1}" |
| ) |
| pos_type = pos_pnl.get("position", {}).get( |
| "position_type", "unknown" |
| ) |
|
|
| if pos_type == "option": |
| option_type = pos_pnl.get("position", {}).get("option_type", "") |
| strike = pos_pnl.get("position", {}).get("strike", 0) |
| pos_desc = f"{pos_desc} {option_type} {strike}" |
|
|
| pnl_at_current = pos_pnl["pnl_values"][current_idx] |
|
|
| |
|
|
| |
| plot_pnl(pnl_data, summary, current_price, ticker, mode="default") |
|
|
| |
|
|
| |
| plot_pnl( |
| pnl_data_cost_basis, |
| summary_cost_basis, |
| current_price, |
| ticker, |
| mode="cost_basis", |
| ) |
|
|
| logger.info(f"P&L validation completed for {ticker} in both modes") |
| break |
| else: |
| logger.warning(f"No P&L data generated for {ticker}") |
| except Exception as e: |
| logger.error(f"Error validating P&L for {ticker}: {e}", exc_info=True) |
|
|
| if not found_group: |
| logger.error( |
| "No valid position groups found for analysis. Please check the portfolio data." |
| ) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|