|
|
|
|
|
""" |
|
|
Validation script for P&L calculations using real portfolio data. |
|
|
|
|
|
This script loads portfolio data from private-data/, processes the options, |
|
|
and validates P&L calculations for a position group (e.g., SPY options). |
|
|
""" |
|
|
|
|
|
import logging |
|
|
import os |
|
|
import sys |
|
|
from pathlib import Path |
|
|
|
|
|
import matplotlib.pyplot as plt |
|
|
import numpy as np |
|
|
import pandas as pd |
|
|
|
|
|
|
|
|
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) |
|
|
|
|
|
from src.folio.pnl import ( |
|
|
calculate_strategy_pnl, |
|
|
determine_price_range, |
|
|
summarize_strategy_pnl, |
|
|
) |
|
|
from src.folio.portfolio import process_portfolio_data |
|
|
|
|
|
|
|
|
logging.basicConfig( |
|
|
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" |
|
|
) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
def print_position_details(positions): |
|
|
""" |
|
|
Print details of positions for debugging. |
|
|
|
|
|
Args: |
|
|
positions: List of positions |
|
|
""" |
|
|
|
|
|
for _i, pos in enumerate(positions): |
|
|
position_type = getattr(pos, "position_type", "unknown") |
|
|
if position_type == "stock": |
|
|
pass |
|
|
elif position_type == "option": |
|
|
pass |
|
|
else: |
|
|
pass |
|
|
|
|
|
|
|
|
def validate_pnl_for_group(group): |
|
|
""" |
|
|
Validate P&L calculations for a position group. |
|
|
|
|
|
Args: |
|
|
group: Position group |
|
|
|
|
|
Returns: |
|
|
P&L data |
|
|
""" |
|
|
|
|
|
all_positions = [] |
|
|
if group.stock_position: |
|
|
all_positions.append(group.stock_position) |
|
|
|
|
|
all_positions.extend(group.option_positions) |
|
|
|
|
|
if not all_positions: |
|
|
logger.warning(f"No positions found in group {group.ticker}") |
|
|
return None |
|
|
|
|
|
|
|
|
print_position_details(all_positions) |
|
|
|
|
|
|
|
|
current_price = None |
|
|
if group.stock_position: |
|
|
current_price = group.stock_position.price |
|
|
elif group.option_positions: |
|
|
|
|
|
first_option = group.option_positions[0] |
|
|
if hasattr(first_option, "notional_value") and hasattr( |
|
|
first_option, "quantity" |
|
|
): |
|
|
|
|
|
abs_quantity = abs(first_option.quantity) |
|
|
if abs_quantity > 0: |
|
|
current_price = first_option.notional_value / (100 * abs_quantity) |
|
|
|
|
|
if current_price is None: |
|
|
|
|
|
current_price = 100.0 |
|
|
logger.warning( |
|
|
f"Could not determine current price for {group.ticker}, using default: ${current_price:.2f}" |
|
|
) |
|
|
else: |
|
|
logger.info(f"Using current price for {group.ticker}: ${current_price:.2f}") |
|
|
|
|
|
|
|
|
price_range = determine_price_range(all_positions, current_price) |
|
|
logger.info(f"Price range: ${price_range[0]:.2f} to ${price_range[1]:.2f}") |
|
|
|
|
|
|
|
|
pnl_data_default = calculate_strategy_pnl( |
|
|
all_positions, price_range=price_range, num_points=100, use_cost_basis=False |
|
|
) |
|
|
|
|
|
|
|
|
summary_default = summarize_strategy_pnl(pnl_data_default, current_price) |
|
|
|
|
|
|
|
|
pnl_data_cost_basis = calculate_strategy_pnl( |
|
|
all_positions, price_range=price_range, num_points=100, use_cost_basis=True |
|
|
) |
|
|
|
|
|
|
|
|
summary_cost_basis = summarize_strategy_pnl(pnl_data_cost_basis, current_price) |
|
|
|
|
|
|
|
|
return { |
|
|
"default": (pnl_data_default, summary_default), |
|
|
"cost_basis": (pnl_data_cost_basis, summary_cost_basis), |
|
|
}, current_price |
|
|
|
|
|
|
|
|
def plot_pnl( |
|
|
pnl_data, summary, current_price, ticker, mode="default", output_dir=".tmp" |
|
|
): |
|
|
""" |
|
|
Plot P&L data and save to file. |
|
|
|
|
|
Args: |
|
|
pnl_data: P&L data from calculate_strategy_pnl |
|
|
summary: Summary data from summarize_strategy_pnl |
|
|
current_price: Current price of the underlying |
|
|
ticker: Ticker symbol |
|
|
mode: Mode used for P&L calculation ("default" or "cost_basis") |
|
|
output_dir: Directory to save plot |
|
|
""" |
|
|
|
|
|
os.makedirs(output_dir, exist_ok=True) |
|
|
|
|
|
|
|
|
plt.figure(figsize=(12, 8)) |
|
|
|
|
|
|
|
|
plt.plot( |
|
|
pnl_data["price_points"], |
|
|
pnl_data["pnl_values"], |
|
|
"b-", |
|
|
linewidth=2, |
|
|
label=f"{ticker} Strategy P&L", |
|
|
) |
|
|
|
|
|
|
|
|
if "individual_pnls" in pnl_data: |
|
|
for i, pos_pnl in enumerate(pnl_data["individual_pnls"]): |
|
|
pos_desc = pos_pnl.get("position", {}).get("ticker", f"Position {i + 1}") |
|
|
plt.plot( |
|
|
pos_pnl["price_points"], |
|
|
pos_pnl["pnl_values"], |
|
|
"--", |
|
|
linewidth=1, |
|
|
alpha=0.5, |
|
|
label=pos_desc, |
|
|
) |
|
|
|
|
|
|
|
|
plt.axhline(y=0, color="r", linestyle="-", alpha=0.3) |
|
|
plt.axvline( |
|
|
x=current_price, |
|
|
color="g", |
|
|
linestyle="--", |
|
|
alpha=0.5, |
|
|
label=f"Current Price: ${current_price:.2f}", |
|
|
) |
|
|
|
|
|
|
|
|
for bp in summary["breakeven_points"]: |
|
|
plt.axvline(x=bp, color="orange", linestyle=":", alpha=0.5) |
|
|
plt.text(bp, 0, f"BE: ${bp:.2f}", rotation=90, verticalalignment="center") |
|
|
|
|
|
|
|
|
max_profit_price = summary["max_profit_price"] |
|
|
max_profit = summary["max_profit"] |
|
|
plt.plot(max_profit_price, max_profit, "go", markersize=8) |
|
|
plt.text( |
|
|
max_profit_price, |
|
|
max_profit, |
|
|
f"Max Profit: ${max_profit:.2f}", |
|
|
verticalalignment="bottom", |
|
|
horizontalalignment="center", |
|
|
) |
|
|
|
|
|
max_loss_price = summary["max_loss_price"] |
|
|
max_loss = summary["max_loss"] |
|
|
plt.plot(max_loss_price, max_loss, "ro", markersize=8) |
|
|
plt.text( |
|
|
max_loss_price, |
|
|
max_loss, |
|
|
f"Max Loss: ${max_loss:.2f}", |
|
|
verticalalignment="top", |
|
|
horizontalalignment="center", |
|
|
) |
|
|
|
|
|
|
|
|
current_pnl = summary["current_pnl"] |
|
|
plt.plot(current_price, current_pnl, "yo", markersize=8) |
|
|
plt.text( |
|
|
current_price, |
|
|
current_pnl, |
|
|
f"Current P&L: ${current_pnl:.2f}", |
|
|
verticalalignment="bottom", |
|
|
horizontalalignment="right", |
|
|
) |
|
|
|
|
|
|
|
|
mode_label = "Using Cost Basis" if mode == "cost_basis" else "Using Current Price" |
|
|
plt.title(f"P&L Analysis for {ticker} Position Group ({mode_label})") |
|
|
plt.xlabel(f"{ticker} Price") |
|
|
plt.ylabel("P&L ($)") |
|
|
plt.grid(True, alpha=0.3) |
|
|
plt.legend(loc="best") |
|
|
|
|
|
|
|
|
output_file = os.path.join( |
|
|
output_dir, f"{ticker.lower()}_pnl_validation_{mode}.png" |
|
|
) |
|
|
plt.savefig(output_file) |
|
|
logger.info(f"P&L plot saved to {output_file}") |
|
|
|
|
|
|
|
|
plt.close() |
|
|
|
|
|
|
|
|
def main(): |
|
|
""" |
|
|
Load portfolio data, process options, and validate P&L calculations. |
|
|
""" |
|
|
|
|
|
private_data_dir = Path("private-data") |
|
|
if not private_data_dir.exists(): |
|
|
logger.error(f"Private data directory not found: {private_data_dir}") |
|
|
return |
|
|
|
|
|
portfolio_files = list(private_data_dir.glob("pf-*.csv")) |
|
|
if not portfolio_files: |
|
|
logger.error(f"No portfolio files found in {private_data_dir}") |
|
|
return |
|
|
|
|
|
|
|
|
portfolio_files.sort(reverse=True) |
|
|
portfolio_file = portfolio_files[0] |
|
|
logger.info(f"Using portfolio file: {portfolio_file}") |
|
|
|
|
|
|
|
|
df = pd.read_csv(portfolio_file) |
|
|
logger.info(f"Loaded portfolio data with {len(df)} positions") |
|
|
|
|
|
|
|
|
try: |
|
|
|
|
|
result = process_portfolio_data(df) |
|
|
if isinstance(result, tuple) and len(result) >= 2: |
|
|
groups, summary = result[0], result[1] |
|
|
logger.info( |
|
|
f"Portfolio data processed successfully with {len(groups)} groups" |
|
|
) |
|
|
else: |
|
|
logger.error("Unexpected result format from process_portfolio_data") |
|
|
return |
|
|
except Exception as e: |
|
|
logger.error(f"Error processing portfolio data: {e}") |
|
|
return |
|
|
|
|
|
|
|
|
tickers_to_analyze = ["META", "AMZN", "GOOGL", "NVDA", "QQQ", "SPY"] |
|
|
|
|
|
found_group = False |
|
|
for ticker in tickers_to_analyze: |
|
|
|
|
|
matching_groups = [g for g in groups if g.ticker == ticker] |
|
|
|
|
|
if not matching_groups: |
|
|
logger.warning(f"No {ticker} position group found in portfolio") |
|
|
continue |
|
|
|
|
|
group = matching_groups[0] |
|
|
found_group = True |
|
|
|
|
|
logger.info(f"Found {ticker} position group") |
|
|
if group.option_positions: |
|
|
logger.info(f" Option positions: {len(group.option_positions)}") |
|
|
if group.stock_position: |
|
|
logger.info(f" Stock position: {group.stock_position.quantity} shares") |
|
|
|
|
|
|
|
|
try: |
|
|
pnl_results, current_price = validate_pnl_for_group(group) |
|
|
if pnl_results: |
|
|
|
|
|
pnl_data, summary = pnl_results["default"] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for key in pnl_data.keys(): |
|
|
if key == "individual_pnls": |
|
|
for _i, _pos_pnl in enumerate(pnl_data[key]): |
|
|
pass |
|
|
|
|
|
|
|
|
max_profit_price = summary["max_profit_price"] |
|
|
max_profit_idx = 0 |
|
|
for i, price in enumerate(pnl_data["price_points"]): |
|
|
if abs(price - max_profit_price) < 0.01: |
|
|
max_profit_idx = i |
|
|
break |
|
|
|
|
|
total_pnl = 0 |
|
|
for i, pos_pnl in enumerate(pnl_data["individual_pnls"]): |
|
|
pos_desc = pos_pnl.get("position", {}).get( |
|
|
"ticker", f"Position {i + 1}" |
|
|
) |
|
|
pos_type = pos_pnl.get("position", {}).get( |
|
|
"position_type", "unknown" |
|
|
) |
|
|
pos_pnl.get("position", {}).get("quantity", 0) |
|
|
|
|
|
if pos_type == "option": |
|
|
option_type = pos_pnl.get("position", {}).get("option_type", "") |
|
|
strike = pos_pnl.get("position", {}).get("strike", 0) |
|
|
pos_desc = f"{pos_desc} {option_type} {strike}" |
|
|
|
|
|
pnl_at_max = pos_pnl["pnl_values"][max_profit_idx] |
|
|
total_pnl += pnl_at_max |
|
|
|
|
|
|
|
|
|
|
|
price_points = np.array(pnl_data["price_points"]) |
|
|
current_idx = np.abs(price_points - current_price).argmin() |
|
|
|
|
|
total_current_pnl = 0 |
|
|
for i, pos_pnl in enumerate(pnl_data["individual_pnls"]): |
|
|
pos_desc = pos_pnl.get("position", {}).get( |
|
|
"ticker", f"Position {i + 1}" |
|
|
) |
|
|
pos_type = pos_pnl.get("position", {}).get( |
|
|
"position_type", "unknown" |
|
|
) |
|
|
pos_pnl.get("position", {}).get("quantity", 0) |
|
|
|
|
|
if pos_type == "option": |
|
|
option_type = pos_pnl.get("position", {}).get("option_type", "") |
|
|
strike = pos_pnl.get("position", {}).get("strike", 0) |
|
|
pos_desc = f"{pos_desc} {option_type} {strike}" |
|
|
|
|
|
pnl_at_current = pos_pnl["pnl_values"][current_idx] |
|
|
total_current_pnl += pnl_at_current |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
pnl_data_cost_basis, summary_cost_basis = pnl_results["cost_basis"] |
|
|
|
|
|
|
|
|
for i, pos_pnl in enumerate(pnl_data["individual_pnls"]): |
|
|
pos_desc = pos_pnl.get("position", {}).get( |
|
|
"ticker", f"Position {i + 1}" |
|
|
) |
|
|
pos_type = pos_pnl.get("position", {}).get( |
|
|
"position_type", "unknown" |
|
|
) |
|
|
|
|
|
if pos_type == "option": |
|
|
option_type = pos_pnl.get("position", {}).get("option_type", "") |
|
|
strike = pos_pnl.get("position", {}).get("strike", 0) |
|
|
pos_desc = f"{pos_desc} {option_type} {strike}" |
|
|
|
|
|
pnl_at_current = pos_pnl["pnl_values"][current_idx] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
plot_pnl(pnl_data, summary, current_price, ticker, mode="default") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
plot_pnl( |
|
|
pnl_data_cost_basis, |
|
|
summary_cost_basis, |
|
|
current_price, |
|
|
ticker, |
|
|
mode="cost_basis", |
|
|
) |
|
|
|
|
|
logger.info(f"P&L validation completed for {ticker} in both modes") |
|
|
break |
|
|
else: |
|
|
logger.warning(f"No P&L data generated for {ticker}") |
|
|
except Exception as e: |
|
|
logger.error(f"Error validating P&L for {ticker}: {e}", exc_info=True) |
|
|
|
|
|
if not found_group: |
|
|
logger.error( |
|
|
"No valid position groups found for analysis. Please check the portfolio data." |
|
|
) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|