QuantScaleAI / main.py
AJAY KASU
Trigger rebuild on HF Space
42e0a00
import logging
import pandas as pd
from typing import Dict, Any
from config import settings
from data.data_manager import MarketDataEngine
from analytics.risk_model import RiskModel
from data.optimizer import PortfolioOptimizer
from analytics.tax_module import TaxEngine
from analytics.attribution import AttributionEngine
from ai.ai_reporter import AIReporter
from core.schema import OptimizationRequest, TickerData
# Setup Logging
logging.basicConfig(level=settings.LOG_LEVEL, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger("QuantScaleAI")
class QuantScaleSystem:
def __init__(self):
self.data_engine = MarketDataEngine()
self.risk_model = RiskModel()
self.optimizer = PortfolioOptimizer()
self.tax_engine = TaxEngine()
self.attribution_engine = AttributionEngine()
self.ai_reporter = AIReporter()
def run_pipeline(self, request: OptimizationRequest):
logger.info(f"Starting pipeline for Client {request.client_id}...")
# 0. LLM Intent Parsing (New)
if request.user_prompt and not request.excluded_sectors:
logger.info(f"Parsing user intent: '{request.user_prompt}'")
request.excluded_sectors = self.ai_reporter.parse_intent(request.user_prompt)
logger.info(f"LLM Mapped Exclusions: {request.excluded_sectors}")
# 1. Fetch Universe (S&P 500)
tickers = self.data_engine.fetch_sp500_tickers()
# OPTIMIZATION: Filter Universe BEFORE Fetching Data
# But we MUST fetch "Market Drivers" to define a realistic Benchmark
# Otherwise TE is 0.0 because Benchmark == Portfolio Universe
caps = self.data_engine.fetch_market_caps(tickers)
valid_caps = {t: c for t, c in caps.items() if c > 0}
sorted_by_cap = sorted(valid_caps.keys(), key=lambda t: valid_caps[t])
# Define "Market Drivers" (Top 20) - Essential for S&P 500 Proxy
market_drivers = sorted_by_cap[-20:]
valid_tickers_for_fetch = []
if request.strategy and request.top_n:
logger.info(f"Applying Strategy PRE-FETCH: {request.strategy} with Top N={request.top_n}")
if request.strategy == "smallest_market_cap":
targets = sorted_by_cap[:request.top_n]
# We fetch Targets + Drivers
valid_tickers_for_fetch = list(set(targets + market_drivers))
logger.info(f"Fetching {len(valid_tickers_for_fetch)} tickers (Targets + Drivers)")
elif request.strategy == "largest_market_cap":
targets = sorted_by_cap[-request.top_n:]
valid_tickers_for_fetch = list(set(targets + market_drivers))
else:
# Default safety limit for Demo
valid_tickers_for_fetch = tickers[:60]
# 2. Get Market Data (Only for filtered subset)
# Fetch last 2 years for covariance
data = self.data_engine.fetch_market_data(valid_tickers_for_fetch, start_date="2023-01-01")
if data.empty:
logger.error("No market data available. Aborting.")
return None
returns = data.pct_change().dropna()
# 3. Compute Risk Model
# Ensure we align returns and tickers
valid_tickers = returns.columns.tolist()
# Re-verify filter (data fetch might have dropped some)
if request.strategy and request.top_n:
# Re-sort based on what we actually have?
# Or just proceed, since we pre-filtered.
pass
cov_matrix = self.risk_model.compute_covariance_matrix(returns)
# 4. Get Benchmark Data (Realistic S&P 500 Proxy)
# We assume the Driver stocks carry their heavy weight, and the rest is distributed
n_assets = len(valid_tickers)
benchmark_weights = pd.Series(0.0, index=valid_tickers)
# Assign distinct weights to known Drivers if they are in our data
# Approximate Mag 7 weights (or use market cap ratio if we had total cap)
# Using a proxy distribution logic:
# Calculate Total Cap of our universe subset to see relative sizing
subset_caps = {t: valid_caps.get(t, 1e9) for t in valid_tickers}
total_subset_cap = sum(subset_caps.values())
# If we are missing 400 stocks, we can't normalize to 1.0 perfectly *relative to SPX*
# But for the Optimizer's math (Port vs Bench), both must sum to 1.0 within the optimization universe?
# NO. If we want TE against full SPX, we need to handle the "missing" variance.
# But simpler: Normalize weights within the Available Universe based on Cap.
# For "Smallest 50" strategy:
# The Drivers (AAPL, etc.) are in `valid_tickers` now.
# So Benchmark will be Cap Weighted (90% Drivers, 10% Small Caps).
# Portfolio will be Constrained to 0% Drivers.
# Result -> Huge TE. Correct.
for t in valid_tickers:
benchmark_weights[t] = subset_caps[t] / total_subset_cap
# 5. Optimize Portfolio
sector_map = self.data_engine.get_sector_map()
# If Strategy requires excluding the "Drivers" (because they aren't in the Target set)
# We must add them to 'excluded_tickers' for the Optimizer
final_exclusions = list(request.excluded_tickers)
if request.strategy == "smallest_market_cap":
# Exclude anything that IS NOT in the target list (i.e. exclude the Drivers)
# targets from above
targets = sorted_by_cap[:request.top_n]
for t in valid_tickers:
if t not in targets:
final_exclusions.append(t)
# ... logic for largest ...
if request.strategy == "largest_market_cap":
# Drivers are likely IN the target set, so no extra exclusion needed usually
pass
opt_result = self.optimizer.optimize_portfolio(
covariance_matrix=cov_matrix,
tickers=valid_tickers,
benchmark_weights=benchmark_weights,
sector_map=sector_map,
excluded_sectors=request.excluded_sectors,
excluded_tickers=final_exclusions,
max_weight=request.max_weight
)
if opt_result.status != "optimal":
logger.warning("Optimization might be suboptimal.")
# 6. Attribution Analysis (Simulated Performance)
# We need "performance" loop.
# Let's calculate return over the LAST MONTH for attribution
last_month = returns.iloc[-21:]
asset_period_return = (1 + last_month).prod() - 1
attribution = self.attribution_engine.generate_attribution_report(
portfolio_weights=opt_result.weights,
benchmark_weights=benchmark_weights.to_dict(),
asset_returns=asset_period_return,
sector_map=sector_map
)
# 7. AI Reporting
# Combine exclusions for the narrative
exclusions_list = request.excluded_sectors + request.excluded_tickers
excluded = ", ".join(exclusions_list) if exclusions_list else "None"
commentary = self.ai_reporter.generate_report(
attribution_report=attribution,
excluded_sector=excluded,
tracking_error=opt_result.tracking_error
)
return {
"optimization": opt_result,
"attribution": attribution,
"commentary": commentary,
"market_data": returns,
"benchmark_weights": benchmark_weights,
"sector_map": sector_map
}
if __name__ == "__main__":
# Test Run
req = OptimizationRequest(
client_id="TEST_001",
excluded_sectors=["Energy"] # Typical ESG constraint
)
system = QuantScaleSystem()
result = system.run_pipeline(req)
if result:
print("\n--- AI COMMENTARY ---\n")
print(result['commentary'])
# Force HF Build