File size: 4,318 Bytes
cafdd88
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import pandas as pd
import numpy as np
from typing import List, Dict, Optional
from datetime import date, timedelta
import logging
from core.schema import TaxLot, HarvestOpportunity, TickerData

logger = logging.getLogger(__name__)

class TaxEngine:
    """
    Identifies tax-loss harvesting opportunities and suggests proxies
    to avoid Wash Sale violations.
    """
    
    def __init__(self, risk_model=None):
        self.risk_model = risk_model
        
    def check_wash_sale_rule(self, symbol: str, transaction_date: date, 
                             recent_transactions: List[Dict]) -> bool:
        """
        Checks if a sale would trigger a wash sale based on purchases 
        within +/- 30 days. 
        """
        # Simplified simulation: Look for any 'buy' of this symbol in last 30 days
        limit_date = transaction_date - timedelta(days=30)
        
        for txn in recent_transactions:
            if txn['symbol'] == symbol and txn['type'] == 'buy':
                txn_date = txn['date']
                if txn_date >= limit_date and txn_date <= transaction_date:
                    return True
        return False

    def find_proxy(self, loser_ticker: str, sector: str, 
                   candidate_tickers: List[TickerData],
                   correlation_matrix: Optional[pd.DataFrame] = None) -> str:
        """
        Finds a suitable proxy stock in the same sector.
        Ideally high correlation (to maintain tracking) but not "substantially identical".
        """
        # Filter for same sector
        sector_peers = [t.symbol for t in candidate_tickers if t.sector == sector and t.symbol != loser_ticker]
        
        if not sector_peers:
            return "SPY" # Fallback
            
        if correlation_matrix is not None and not correlation_matrix.empty:
            try:
                # Get correlations for the loser ticker
                if loser_ticker in correlation_matrix.index:
                    corrs = correlation_matrix[loser_ticker]
                    # Filter for sector peers
                    peer_corrs = corrs[corrs.index.isin(sector_peers)]
                    # Sort desc, pick top
                    if not peer_corrs.empty:
                        best_proxy = peer_corrs.idxmax()
                        logger.info(f"Found proxy for {loser_ticker} using correlation: {best_proxy} (corr: {peer_corrs.max():.2f})")
                        return best_proxy
            except Exception as e:
                logger.warning(f"Correlation lookup failed: {e}. Falling back to random peer.")
        
        # Fallback: Pick a random peer in the sector
        return sector_peers[0]

    def harvest_losses(self, portfolio_lots: List[TaxLot], 
                       market_prices: Dict[str, float],
                       candidate_tickers: List[TickerData],
                       correlation_matrix: Optional[pd.DataFrame] = None) -> List[HarvestOpportunity]:
        """
        Scans portfolio for lots with > 10% Unrealized Loss.
        """
        opportunities = []
        
        for lot in portfolio_lots:
            # Update current price if available
            if lot.symbol in market_prices:
                lot.current_price = market_prices[lot.symbol]
            
            # Check threshold (e.g. -10%)
            if lot.loss_percentage <= -0.10:
                # Find Proxy
                # Need to find the sector for this ticker from candidate_tickers
                ticker_obj = next((t for t in candidate_tickers if t.symbol == lot.symbol), None)
                sector = ticker_obj.sector if ticker_obj else "Unknown"
                
                proxy = self.find_proxy(lot.symbol, sector, candidate_tickers, correlation_matrix)
                
                opp = HarvestOpportunity(
                    sell_ticker=lot.symbol,
                    buy_proxy_ticker=proxy,
                    quantity=lot.quantity,
                    estimated_loss_harvested=abs(lot.unrealized_pl),
                    reason=f"Loss of {lot.loss_percentage*100:.1f}% exceeds 10% threshold."
                )
                opportunities.append(opp)
                
        logger.info(f"Identified {len(opportunities)} tax-loss harvesting opportunities.")
        return opportunities