import logging import pandas as pd from typing import Dict, Any from config import settings from data.data_manager import MarketDataEngine from analytics.risk_model import RiskModel from data.optimizer import PortfolioOptimizer from analytics.tax_module import TaxEngine from analytics.attribution import AttributionEngine from ai.ai_reporter import AIReporter from core.schema import OptimizationRequest, TickerData # Setup Logging logging.basicConfig(level=settings.LOG_LEVEL, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger("QuantScaleAI") class QuantScaleSystem: def __init__(self): self.data_engine = MarketDataEngine() self.risk_model = RiskModel() self.optimizer = PortfolioOptimizer() self.tax_engine = TaxEngine() self.attribution_engine = AttributionEngine() self.ai_reporter = AIReporter() def run_pipeline(self, request: OptimizationRequest): logger.info(f"Starting pipeline for Client {request.client_id}...") # 0. LLM Intent Parsing (New) if request.user_prompt and not request.excluded_sectors: logger.info(f"Parsing user intent: '{request.user_prompt}'") request.excluded_sectors = self.ai_reporter.parse_intent(request.user_prompt) logger.info(f"LLM Mapped Exclusions: {request.excluded_sectors}") # 1. Fetch Universe (S&P 500) tickers = self.data_engine.fetch_sp500_tickers() # OPTIMIZATION: Filter Universe BEFORE Fetching Data # But we MUST fetch "Market Drivers" to define a realistic Benchmark # Otherwise TE is 0.0 because Benchmark == Portfolio Universe caps = self.data_engine.fetch_market_caps(tickers) valid_caps = {t: c for t, c in caps.items() if c > 0} sorted_by_cap = sorted(valid_caps.keys(), key=lambda t: valid_caps[t]) # Define "Market Drivers" (Top 20) - Essential for S&P 500 Proxy market_drivers = sorted_by_cap[-20:] valid_tickers_for_fetch = [] if request.strategy and request.top_n: logger.info(f"Applying Strategy PRE-FETCH: {request.strategy} with Top N={request.top_n}") if request.strategy == "smallest_market_cap": targets = sorted_by_cap[:request.top_n] # We fetch Targets + Drivers valid_tickers_for_fetch = list(set(targets + market_drivers)) logger.info(f"Fetching {len(valid_tickers_for_fetch)} tickers (Targets + Drivers)") elif request.strategy == "largest_market_cap": targets = sorted_by_cap[-request.top_n:] valid_tickers_for_fetch = list(set(targets + market_drivers)) else: # Default safety limit for Demo valid_tickers_for_fetch = tickers[:60] # 2. Get Market Data (Only for filtered subset) # Fetch last 2 years for covariance data = self.data_engine.fetch_market_data(valid_tickers_for_fetch, start_date="2023-01-01") if data.empty: logger.error("No market data available. Aborting.") return None returns = data.pct_change().dropna() # 3. Compute Risk Model # Ensure we align returns and tickers valid_tickers = returns.columns.tolist() # Re-verify filter (data fetch might have dropped some) if request.strategy and request.top_n: # Re-sort based on what we actually have? # Or just proceed, since we pre-filtered. pass cov_matrix = self.risk_model.compute_covariance_matrix(returns) # 4. Get Benchmark Data (Realistic S&P 500 Proxy) # We assume the Driver stocks carry their heavy weight, and the rest is distributed n_assets = len(valid_tickers) benchmark_weights = pd.Series(0.0, index=valid_tickers) # Assign distinct weights to known Drivers if they are in our data # Approximate Mag 7 weights (or use market cap ratio if we had total cap) # Using a proxy distribution logic: # Calculate Total Cap of our universe subset to see relative sizing subset_caps = {t: valid_caps.get(t, 1e9) for t in valid_tickers} total_subset_cap = sum(subset_caps.values()) # If we are missing 400 stocks, we can't normalize to 1.0 perfectly *relative to SPX* # But for the Optimizer's math (Port vs Bench), both must sum to 1.0 within the optimization universe? # NO. If we want TE against full SPX, we need to handle the "missing" variance. # But simpler: Normalize weights within the Available Universe based on Cap. # For "Smallest 50" strategy: # The Drivers (AAPL, etc.) are in `valid_tickers` now. # So Benchmark will be Cap Weighted (90% Drivers, 10% Small Caps). # Portfolio will be Constrained to 0% Drivers. # Result -> Huge TE. Correct. for t in valid_tickers: benchmark_weights[t] = subset_caps[t] / total_subset_cap # 5. Optimize Portfolio sector_map = self.data_engine.get_sector_map() # If Strategy requires excluding the "Drivers" (because they aren't in the Target set) # We must add them to 'excluded_tickers' for the Optimizer final_exclusions = list(request.excluded_tickers) if request.strategy == "smallest_market_cap": # Exclude anything that IS NOT in the target list (i.e. exclude the Drivers) # targets from above targets = sorted_by_cap[:request.top_n] for t in valid_tickers: if t not in targets: final_exclusions.append(t) # ... logic for largest ... if request.strategy == "largest_market_cap": # Drivers are likely IN the target set, so no extra exclusion needed usually pass opt_result = self.optimizer.optimize_portfolio( covariance_matrix=cov_matrix, tickers=valid_tickers, benchmark_weights=benchmark_weights, sector_map=sector_map, excluded_sectors=request.excluded_sectors, excluded_tickers=final_exclusions, max_weight=request.max_weight ) if opt_result.status != "optimal": logger.warning("Optimization might be suboptimal.") # 6. Attribution Analysis (Simulated Performance) # We need "performance" loop. # Let's calculate return over the LAST MONTH for attribution last_month = returns.iloc[-21:] asset_period_return = (1 + last_month).prod() - 1 attribution = self.attribution_engine.generate_attribution_report( portfolio_weights=opt_result.weights, benchmark_weights=benchmark_weights.to_dict(), asset_returns=asset_period_return, sector_map=sector_map ) # 7. AI Reporting # Combine exclusions for the narrative exclusions_list = request.excluded_sectors + request.excluded_tickers excluded = ", ".join(exclusions_list) if exclusions_list else "None" commentary = self.ai_reporter.generate_report( attribution_report=attribution, excluded_sector=excluded, tracking_error=opt_result.tracking_error ) return { "optimization": opt_result, "attribution": attribution, "commentary": commentary, "market_data": returns, "benchmark_weights": benchmark_weights, "sector_map": sector_map } if __name__ == "__main__": # Test Run req = OptimizationRequest( client_id="TEST_001", excluded_sectors=["Energy"] # Typical ESG constraint ) system = QuantScaleSystem() result = system.run_pipeline(req) if result: print("\n--- AI COMMENTARY ---\n") print(result['commentary']) # Force HF Build