Spaces:
Running
Running
| import pandas as pd | |
| import numpy as np | |
| from typing import List, Dict, Optional | |
| from datetime import date, timedelta | |
| import logging | |
| from core.schema import TaxLot, HarvestOpportunity, TickerData | |
| logger = logging.getLogger(__name__) | |
| class TaxEngine: | |
| """ | |
| Identifies tax-loss harvesting opportunities and suggests proxies | |
| to avoid Wash Sale violations. | |
| """ | |
| def __init__(self, risk_model=None): | |
| self.risk_model = risk_model | |
| def check_wash_sale_rule(self, symbol: str, transaction_date: date, | |
| recent_transactions: List[Dict]) -> bool: | |
| """ | |
| Checks if a sale would trigger a wash sale based on purchases | |
| within +/- 30 days. | |
| """ | |
| # Simplified simulation: Look for any 'buy' of this symbol in last 30 days | |
| limit_date = transaction_date - timedelta(days=30) | |
| for txn in recent_transactions: | |
| if txn['symbol'] == symbol and txn['type'] == 'buy': | |
| txn_date = txn['date'] | |
| if txn_date >= limit_date and txn_date <= transaction_date: | |
| return True | |
| return False | |
| def find_proxy(self, loser_ticker: str, sector: str, | |
| candidate_tickers: List[TickerData], | |
| correlation_matrix: Optional[pd.DataFrame] = None) -> str: | |
| """ | |
| Finds a suitable proxy stock in the same sector. | |
| Ideally high correlation (to maintain tracking) but not "substantially identical". | |
| """ | |
| # Filter for same sector | |
| sector_peers = [t.symbol for t in candidate_tickers if t.sector == sector and t.symbol != loser_ticker] | |
| if not sector_peers: | |
| return "SPY" # Fallback | |
| if correlation_matrix is not None and not correlation_matrix.empty: | |
| try: | |
| # Get correlations for the loser ticker | |
| if loser_ticker in correlation_matrix.index: | |
| corrs = correlation_matrix[loser_ticker] | |
| # Filter for sector peers | |
| peer_corrs = corrs[corrs.index.isin(sector_peers)] | |
| # Sort desc, pick top | |
| if not peer_corrs.empty: | |
| best_proxy = peer_corrs.idxmax() | |
| logger.info(f"Found proxy for {loser_ticker} using correlation: {best_proxy} (corr: {peer_corrs.max():.2f})") | |
| return best_proxy | |
| except Exception as e: | |
| logger.warning(f"Correlation lookup failed: {e}. Falling back to random peer.") | |
| # Fallback: Pick a random peer in the sector | |
| return sector_peers[0] | |
| def harvest_losses(self, portfolio_lots: List[TaxLot], | |
| market_prices: Dict[str, float], | |
| candidate_tickers: List[TickerData], | |
| correlation_matrix: Optional[pd.DataFrame] = None) -> List[HarvestOpportunity]: | |
| """ | |
| Scans portfolio for lots with > 10% Unrealized Loss. | |
| """ | |
| opportunities = [] | |
| for lot in portfolio_lots: | |
| # Update current price if available | |
| if lot.symbol in market_prices: | |
| lot.current_price = market_prices[lot.symbol] | |
| # Check threshold (e.g. -10%) | |
| if lot.loss_percentage <= -0.10: | |
| # Find Proxy | |
| # Need to find the sector for this ticker from candidate_tickers | |
| ticker_obj = next((t for t in candidate_tickers if t.symbol == lot.symbol), None) | |
| sector = ticker_obj.sector if ticker_obj else "Unknown" | |
| proxy = self.find_proxy(lot.symbol, sector, candidate_tickers, correlation_matrix) | |
| opp = HarvestOpportunity( | |
| sell_ticker=lot.symbol, | |
| buy_proxy_ticker=proxy, | |
| quantity=lot.quantity, | |
| estimated_loss_harvested=abs(lot.unrealized_pl), | |
| reason=f"Loss of {lot.loss_percentage*100:.1f}% exceeds 10% threshold." | |
| ) | |
| opportunities.append(opp) | |
| logger.info(f"Identified {len(opportunities)} tax-loss harvesting opportunities.") | |
| return opportunities | |