import asyncio from dataclasses import dataclass from dataclasses import replace from datetime import date, timedelta from decimal import Decimal, InvalidOperation import re from typing import Any import numpy as np from App.analysis.volatility import calculate_annualized_volatility from App.routers.bonds.models import Bond from App.routers.economy.valuation import get_bond_pricing_context from App.routers.funds.models import FundPerformance, MutualFund from App.routers.stocks.metrics import calculate_metrics, calculate_liquidity_metrics from App.routers.stocks.models import Stock, StockPriceData from App.routers.stocks.seed import load_dse_transaction_fee_seed_data from App.routers.tasks.models import ImportTask, TaskStatus GROWTH_CLASS_LIMITS = { "STOCK": (0.30, 0.70), "FUND": (0.10, 0.45), "BOND": (0.05, 0.35), } MAX_ASSET_WEIGHT = 0.35 RISK_PROFILES = { "conservative": { "label": "Conservative", "class_limits": {"STOCK": (0.00, 0.35), "FUND": (0.25, 0.70), "BOND": (0.15, 0.60)}, "max_asset_weight": 0.25, "return_weight": 0.55, "volatility_penalty": 1.35, "income_weight": 0.45, }, "balanced": { "label": "Balanced", "class_limits": GROWTH_CLASS_LIMITS, "max_asset_weight": MAX_ASSET_WEIGHT, "return_weight": 0.85, "volatility_penalty": 0.85, "income_weight": 0.25, }, "growth": { "label": "Growth", "class_limits": {"STOCK": (0.30, 0.75), "FUND": (0.05, 0.45), "BOND": (0.00, 0.25)}, "max_asset_weight": 0.40, "return_weight": 1.15, "volatility_penalty": 0.45, "income_weight": 0.10, }, } DEFAULT_RISK_FREE_RATE = 0.12 DEFAULT_SIMULATIONS = 6000 HIGH_COUPON_ADVANTAGE_BPS = 150 LEGACY_BOND_REPLACEMENT_MARGIN = 0.06 LOW_MODERATE_VOLATILITY_MAX = 0.16 MIN_REPLACEMENT_HISTORY_YEARS = 3.0 @dataclass class OptimizerAsset: key: str asset_id: int asset_type: str symbol: str name: str quantity: float current_price: float current_value: float expected_return: float volatility: float income_yield: float fee_rate: float fundamentals: dict[str, Any] | None = None liquidity: dict[str, Any] | None = None is_candidate: bool = False def _blend_pair(start: tuple[float, float], end: tuple[float, float], t: float) -> tuple[float, float]: return ( start[0] + (end[0] - start[0]) * t, start[1] + (end[1] - start[1]) * t, ) def _risk_settings(risk_profile: str | None, risk_score: int | None = None) -> dict[str, Any]: if risk_score is not None: score = max(0, min(100, int(risk_score))) t = score / 100 low = RISK_PROFILES["conservative"] high = RISK_PROFILES["growth"] if score < 34: label = "Conservative" elif score > 66: label = "Aggressive" else: label = "Balanced" return { "key": "conservative" if score < 34 else "growth" if score > 66 else "balanced", "label": label, "risk_score": score, "class_limits": { asset_type: _blend_pair(low["class_limits"][asset_type], high["class_limits"][asset_type], t) for asset_type in GROWTH_CLASS_LIMITS }, "max_asset_weight": low["max_asset_weight"] + (high["max_asset_weight"] - low["max_asset_weight"]) * t, "return_weight": low["return_weight"] + (high["return_weight"] - low["return_weight"]) * t, "volatility_penalty": low["volatility_penalty"] + (high["volatility_penalty"] - low["volatility_penalty"]) * t, "income_weight": low["income_weight"] + (high["income_weight"] - low["income_weight"]) * t, } key = str(risk_profile or "balanced").strip().lower() if key in {"low", "safe", "safer", "defensive"}: key = "conservative" elif key in {"high", "aggressive"}: key = "growth" settings = RISK_PROFILES.get(key, RISK_PROFILES["balanced"]) fallback_score = 15 if key == "conservative" else 85 if key == "growth" else 50 return {"key": key if key in RISK_PROFILES else "balanced", "risk_score": fallback_score, **settings} def _safe_float(value: Any, default: float = 0.0) -> float: if value in (None, ""): return default try: return float(value) except (TypeError, ValueError, InvalidOperation): return default def _annualized_return(values: list[tuple[date, float]]) -> float | None: ordered = [(d, v) for d, v in sorted(values, key=lambda item: item[0]) if v > 0] if len(ordered) < 2: return None start_date, start_value = ordered[0] end_date, end_value = ordered[-1] days = max((end_date - start_date).days, 1) total_return = (end_value / start_value) - 1 return (1 + total_return) ** (365 / days) - 1 def _history_years(values: list[tuple[date, float]]) -> float: ordered = [item for item in sorted(values, key=lambda item: item[0]) if item[1] > 0] if len(ordered) < 2: return 0.0 return max((ordered[-1][0] - ordered[0][0]).days / 365.25, 0.0) def _stock_fee_band(consideration: float) -> dict[str, Any] | None: payload = load_dse_transaction_fee_seed_data() for band in payload.get("bands", []): min_value = _safe_float(band.get("min_consideration")) max_raw = band.get("max_consideration") max_value = float("inf") if max_raw is None else _safe_float(max_raw) if consideration > min_value and consideration <= max_value: return band return None def _stock_fee_rate(consideration: float) -> float: band = _stock_fee_band(consideration) if band: return _safe_float(band.get("total_cost_to_investor")) / 100 return 0.0206 def _parse_percent(raw_value: Any) -> float: if raw_value is None: return 0.0 text = str(raw_value) digits = "".join(ch for ch in text if ch.isdigit() or ch == ".") return _safe_float(digits) / 100 if digits else 0.0 def _parse_distribution_rate(raw_value: Any) -> float: text = str(raw_value or "") matches = re.findall(r"(\d+(?:\.\d+)?)\s*%", text) if not matches: return 0.0 return max(_safe_float(match) / 100 for match in matches) def _parse_money_value(raw_value: Any) -> float | None: if raw_value in (None, ""): return None digits = "".join(ch for ch in str(raw_value) if ch.isdigit() or ch == ".") return _safe_float(digits) if digits else None def _parse_fee_rate(raw_value: Any) -> float: text = str(raw_value or "").lower() if not text or "none" in text or "nil" in text: return 0.0 match = re.search(r"(\d+(?:\.\d+)?)\s*%", text) return _safe_float(match.group(1)) / 100 if match else 0.0 async def _fund_prospectus_terms(fund: MutualFund) -> dict[str, Any]: try: info = await fund.info_record except Exception: info = None raw_data = info.raw_data if info and isinstance(info.raw_data, dict) else {} liquidity = raw_data.get("liquidity") if isinstance(raw_data.get("liquidity"), dict) else {} contributions = raw_data.get("contributions") if isinstance(raw_data.get("contributions"), dict) else {} documents = raw_data.get("documents") if isinstance(raw_data.get("documents"), list) else [] min_initial = ( _parse_money_value(raw_data.get("min_initial")) or _parse_money_value(fund.min_initial) or _parse_money_value(contributions.get("min_initial")) ) min_additional = ( _parse_money_value(raw_data.get("min_additional")) or _parse_money_value(fund.min_additional) or _parse_money_value(contributions.get("min_additional")) ) min_redemption = ( _parse_money_value(raw_data.get("min_redemption")) or _parse_money_value(liquidity.get("min_redemption")) ) lock_in_days = raw_data.get("lock_in_days") or liquidity.get("lock_in_days") redemption_days = fund.redemption_days or raw_data.get("redemption_days") or liquidity.get("redemption_days") exit_fee_rate = _parse_fee_rate(fund.exit_load or raw_data.get("exit_load") or liquidity.get("exit_fee")) entry_fee_rate = _safe_float(fund.entry_load) / 100 return { "prospectus_backed": bool(raw_data or documents), "prospectus_documents": [ item.get("path") or item.get("name") for item in documents if isinstance(item, dict) and (item.get("path") or item.get("name")) ], "min_initial": min_initial, "min_additional": min_additional, "min_redemption": min_redemption, "lock_in_days": int(lock_in_days) if str(lock_in_days or "").isdigit() else None, "redemption_days": int(redemption_days) if str(redemption_days or "").isdigit() else redemption_days, "entry_fee_rate": entry_fee_rate, "exit_fee_rate": exit_fee_rate, "exit_load": fund.exit_load, "fund_type": fund.fund_type, "currency": fund.currency, } def _fundamental_adjusted_stock_return( base_return: float, dividend_yield: float, fundamentals: dict[str, Any], liquidity: dict[str, Any], ) -> float: pe_ratio = _safe_float(fundamentals.get("pe_ratio")) pb_ratio = _safe_float(fundamentals.get("pb_ratio")) eps = fundamentals.get("eps") payout_ratio = _safe_float(fundamentals.get("payout_ratio")) debt_to_equity = _safe_float(fundamentals.get("debt_to_equity")) liquidity_score = _safe_float(liquidity.get("liquidity_score")) adjusted = base_return if pe_ratio > 0: earnings_yield = 1 / pe_ratio adjusted = (adjusted * 0.75) + ((earnings_yield + dividend_yield) * 0.25) if pe_ratio > 30: adjusted -= 0.035 elif pe_ratio > 22: adjusted -= 0.015 elif pe_ratio < 10: adjusted += 0.01 elif eps not in (None, ""): adjusted -= 0.07 else: adjusted -= 0.015 if pb_ratio > 0: if pb_ratio > 5: adjusted -= 0.05 elif pb_ratio > 3: adjusted -= 0.025 elif pb_ratio <= 2: adjusted += 0.01 if payout_ratio > 100: adjusted -= 0.025 if debt_to_equity > 150: adjusted -= 0.015 if liquidity_score < 25: adjusted -= 0.05 elif liquidity_score < 45: adjusted -= 0.025 elif liquidity_score > 75: adjusted += 0.01 return adjusted async def _fund_income_yield(fund: MutualFund) -> float: if not fund.pays_income: return 0.0 try: info = await fund.info_record except Exception: info = None data = info.raw_data if info and isinstance(info.raw_data, dict) else {} distribution = data.get("distribution") if isinstance(data.get("distribution"), dict) else {} other_facts = data.get("other_facts") if isinstance(data.get("other_facts"), dict) else {} candidates = [ fund.income_amount, distribution.get("max_distribution_rate"), distribution.get("policy"), other_facts.get("max_distribution_rate"), ] return min(max((_parse_distribution_rate(value) for value in candidates), default=0.0), 0.18) async def _collect_positions(portfolio_id: int) -> list[dict[str, Any]]: from App.routers.portfolio.service import PortfolioService positions = await PortfolioService.get_positions(portfolio_id) return [ { "asset_type": str(position["asset_type"]).upper(), "asset_id": position["asset_id"], "quantity": Decimal(str(position["quantity"])), } for position in positions if _safe_float(position.get("current_value")) > 0 ] async def _build_stock_asset(position: dict[str, Any]) -> OptimizerAsset | None: stock = await Stock.get_or_none(id=position["asset_id"]) if stock is None: return None prices = await StockPriceData.filter( stock_id=stock.id, date__gte=date.today() - timedelta(days=365 * 5 + 7), ).order_by("date").values("date", "closing_price") price_values = [(row["date"], _safe_float(row["closing_price"])) for row in prices] latest = await StockPriceData.filter(stock=stock).order_by("-date").first() if latest is None: return None metrics = await calculate_metrics(stock) or {} capital_return = _annualized_return(price_values) dividend_yield = _safe_float(metrics.get("dividend_yield")) / 100 liquidity = metrics.get("liquidity") or await calculate_liquidity_metrics(stock) liquidity["history_years"] = round(_history_years(price_values), 2) fundamentals = { "eps": metrics.get("eps"), "pe_ratio": metrics.get("pe_ratio"), "pb_ratio": metrics.get("pb_ratio") or metrics.get("price_to_book"), "ps_ratio": metrics.get("ps_ratio"), "earnings_yield": metrics.get("earnings_yield"), "payout_ratio": metrics.get("payout_ratio"), "gross_margin": metrics.get("gross_margin"), "net_margin": metrics.get("net_margin"), "debt_to_equity": metrics.get("debt_to_equity"), "source_name": metrics.get("source_name"), "source_url": metrics.get("source_url"), "data_updated_at": metrics.get("data_updated_at"), } expected_return = _fundamental_adjusted_stock_return( base_return=(capital_return if capital_return is not None else 0.10) + dividend_yield, dividend_yield=dividend_yield, fundamentals=fundamentals, liquidity=liquidity, ) volatility_result = calculate_annualized_volatility(price_values) volatility = ( volatility_result.annualized_volatility if volatility_result else 0.28 ) current_price = _safe_float(latest.closing_price) quantity = _safe_float(position["quantity"]) current_value = quantity * current_price return OptimizerAsset( key=f"STOCK:{stock.id}", asset_id=stock.id, asset_type="STOCK", symbol=stock.symbol, name=stock.name, quantity=quantity, current_price=current_price, current_value=current_value, expected_return=max(min(expected_return, 0.60), -0.30), volatility=max(volatility, 0.08), income_yield=max(dividend_yield, 0.0), fee_rate=_stock_fee_rate(current_value), fundamentals=fundamentals, liquidity=liquidity, ) async def _build_fund_asset(position: dict[str, Any]) -> OptimizerAsset | None: fund = await MutualFund.get_or_none(id=position["asset_id"]) if fund is None: return None rows = await FundPerformance.filter( fund_id=fund.id, record_date__gte=date.today() - timedelta(days=365 * 5 + 7), ).order_by("record_date").values("record_date", "nav_per_unit") nav_values = [ (row["record_date"], _safe_float(row["nav_per_unit"])) for row in rows if row.get("nav_per_unit") is not None ] latest = await FundPerformance.filter(fund=fund).order_by("-record_date").first() if latest is None or latest.nav_per_unit is None: return None income_yield = await _fund_income_yield(fund) expected_return = (_annualized_return(nav_values) or 0.13) + income_yield volatility_result = calculate_annualized_volatility(nav_values) volatility = ( volatility_result.annualized_volatility if volatility_result else 0.08 ) current_price = _safe_float(latest.nav_per_unit) quantity = _safe_float(position["quantity"]) current_value = quantity * current_price prospectus_terms = await _fund_prospectus_terms(fund) redemption_days = prospectus_terms.get("redemption_days") liquidity_score = 82 if isinstance(redemption_days, int) and redemption_days <= 7 else 65 if prospectus_terms.get("lock_in_days"): liquidity_score -= min(int(prospectus_terms["lock_in_days"]) / 180 * 30, 30) if not prospectus_terms.get("prospectus_backed"): liquidity_score -= 12 if str(prospectus_terms.get("currency") or "TZS").upper() != "TZS": liquidity_score -= 18 return OptimizerAsset( key=f"FUND:{fund.id}", asset_id=fund.id, asset_type="FUND", symbol=fund.name[:10].upper(), name=fund.name, quantity=quantity, current_price=current_price, current_value=current_value, expected_return=max(min(expected_return, 0.35), 0.03), volatility=max(volatility, 0.03), income_yield=income_yield, fee_rate=prospectus_terms["entry_fee_rate"], liquidity={ **prospectus_terms, "liquidity_score": round(max(liquidity_score, 20), 2), "history_years": round(_history_years(nav_values), 2), }, ) async def _build_bond_asset(position: dict[str, Any]) -> OptimizerAsset | None: bond = await Bond.get_or_none(id=position["asset_id"]) if bond is None: return None pricing_context = await get_bond_pricing_context(bond) price_per_100 = _safe_float(pricing_context.get("price_percent"), 100.0) current_price = price_per_100 / 100 quantity = _safe_float(position["quantity"]) current_value = quantity * current_price coupon_rate = _safe_float(bond.coupon_rate) / 100 latest_yield_percent = pricing_context.get("latest_yield_percent") market_yield = ( _safe_float(latest_yield_percent) / 100 if latest_yield_percent not in (None, "") else 0.0 ) expected_return = market_yield if market_yield > 0 else max(coupon_rate, 0.10) income_yield = coupon_rate / current_price if current_price > 0 else coupon_rate return OptimizerAsset( key=f"BOND:{bond.id}", asset_id=bond.id, asset_type="BOND", symbol=bond.isin, name=f"{bond.maturity_years} Yr Treasury Bond", quantity=quantity, current_price=current_price, current_value=current_value, expected_return=expected_return, volatility=0.04, income_yield=income_yield, fee_rate=0.0, liquidity={ "liquidity_score": pricing_context.get("liquidity_score", 45), "valuation_quality": pricing_context.get("valuation_quality"), "execution_caveat": pricing_context.get("execution_caveat"), "latest_trade_date": pricing_context.get("latest_trade_date"), "price_source": pricing_context.get("price_source"), "latest_yield_percent": latest_yield_percent, "coupon_rate_percent": _safe_float(bond.coupon_rate), "price_percent": price_per_100, "primary_reinvestment_yield_percent": ( _safe_float((pricing_context.get("comparable_primary_market") or [{}])[0].get("implied_yield_percent")) if pricing_context.get("comparable_primary_market") else None ), "same_bond_secondary_available": pricing_context.get("same_bond_secondary_available"), "comparable_secondary_market": pricing_context.get("comparable_secondary_market") or [], "comparable_median_yield_percent": pricing_context.get("comparable_median_yield_percent"), "comparable_primary_market": pricing_context.get("comparable_primary_market") or [], }, ) async def _build_assets(portfolio_id: int) -> list[OptimizerAsset]: assets: list[OptimizerAsset] = [] for position in await _collect_positions(portfolio_id): asset_type = position["asset_type"] if asset_type == "STOCK": asset = await _build_stock_asset(position) elif asset_type in {"FUND", "UTT"}: asset = await _build_fund_asset(position) elif asset_type == "BOND": asset = await _build_bond_asset(position) else: asset = None if asset and asset.current_value > 0: assets.append(asset) return assets def _asset_candidate_score(asset: OptimizerAsset, settings: dict[str, Any]) -> float: liquidity_score = _safe_float((asset.liquidity or {}).get("liquidity_score")) / 100 valuation_penalty = 0.0 if asset.asset_type == "STOCK": pe_ratio = _safe_float((asset.fundamentals or {}).get("pe_ratio")) pb_ratio = _safe_float((asset.fundamentals or {}).get("pb_ratio")) if pe_ratio > 30 or pb_ratio > 5: valuation_penalty += 0.05 if pe_ratio < 0: valuation_penalty += 0.08 return ( asset.expected_return * settings["return_weight"] + asset.income_yield * settings["income_weight"] + liquidity_score * 0.08 - asset.volatility * settings["volatility_penalty"] - valuation_penalty ) async def _build_new_candidate_assets( current_assets: list[OptimizerAsset], risk_profile: str | None, risk_score: int | None, limit: int = 8, ) -> list[OptimizerAsset]: current_keys = {asset.key for asset in current_assets} settings = _risk_settings(risk_profile, risk_score) candidates: list[OptimizerAsset] = [] funds = await MutualFund.filter(status="Active").all() for fund in funds: if f"FUND:{fund.id}" in current_keys: continue asset = await _build_fund_asset({"asset_id": fund.id, "quantity": Decimal("1")}) if asset: if not (asset.liquidity or {}).get("prospectus_backed"): continue if str((asset.liquidity or {}).get("currency") or "TZS").upper() != "TZS": continue candidates.append(replace(asset, quantity=0.0, current_value=0.0, is_candidate=True)) stocks = await Stock.all() for stock in stocks: if f"STOCK:{stock.id}" in current_keys: continue asset = await _build_stock_asset({"asset_id": stock.id, "quantity": Decimal("1")}) if not asset: continue max_buy_value = _safe_float((asset.liquidity or {}).get("max_buy_value_20d")) if max_buy_value <= 0 and _safe_float((asset.liquidity or {}).get("liquidity_score")) < 20: continue candidates.append(replace(asset, quantity=0.0, current_value=0.0, is_candidate=True)) return sorted( candidates, key=lambda asset: _asset_candidate_score(asset, settings), reverse=True, )[:limit] async def _stock_candidate(stock: Stock) -> dict[str, Any] | None: asset = await _build_stock_asset({"asset_id": stock.id, "quantity": Decimal("1")}) if asset is None: return None return { "asset_id": asset.asset_id, "asset_type": asset.asset_type, "symbol": asset.symbol, "name": asset.name, "expected_return": round(asset.expected_return, 4), "volatility": round(asset.volatility, 4), "income_yield": round(asset.income_yield, 4), "liquidity": asset.liquidity, "fundamentals": asset.fundamentals, "reason": "Stock candidate with fundamentals, dividend history, and observed DSE liquidity.", } async def _fund_candidate(fund: MutualFund) -> dict[str, Any] | None: asset = await _build_fund_asset({"asset_id": fund.id, "quantity": Decimal("1")}) if asset is None: return None return { "asset_id": asset.asset_id, "asset_type": asset.asset_type, "symbol": asset.symbol, "name": asset.name, "expected_return": round(asset.expected_return, 4), "volatility": round(asset.volatility, 4), "income_yield": round(asset.income_yield, 4), "pays_income": asset.income_yield > 0, "liquidity": asset.liquidity, "reason": ( "Income-paying fund candidate with lower measured volatility." if asset.income_yield > 0 else "Fund candidate with lower measured volatility." ), } async def _suggest_alternatives(current_assets: list[OptimizerAsset], risk_profile: str | None, risk_score: int | None) -> list[dict[str, Any]]: current_keys = {asset.key for asset in current_assets} settings = _risk_settings(risk_profile, risk_score) candidates: list[dict[str, Any]] = [] funds = await MutualFund.filter(status="Active").all() for fund in funds: if f"FUND:{fund.id}" in current_keys: continue candidate = await _fund_candidate(fund) if candidate: candidates.append(candidate) stocks = await Stock.all() for stock in stocks: if f"STOCK:{stock.id}" in current_keys: continue candidate = await _stock_candidate(stock) if candidate: candidates.append(candidate) if not candidates: return [] current_volatility = max((asset.volatility for asset in current_assets), default=0.0) max_volatility = current_volatility * (0.85 if settings["key"] == "conservative" else 1.10) safer = [item for item in candidates if item["volatility"] <= max_volatility] or candidates def score(item: dict[str, Any]) -> float: return ( item["expected_return"] * settings["return_weight"] + item.get("income_yield", 0.0) * settings["income_weight"] - item["volatility"] * settings["volatility_penalty"] ) return sorted(safer, key=score, reverse=True)[:5] def _is_igrowth(asset: OptimizerAsset) -> bool: text = f"{asset.symbol} {asset.name}".upper() return "IGROWTH" in text or ("ITRUST" in text and "GROWTH" in text) async def _igrowth_proxy_context(assets: list[OptimizerAsset], total_value: float) -> dict[str, Any] | None: igrowth_assets = [asset for asset in assets if _is_igrowth(asset)] if not igrowth_assets: return None stock_candidates: list[OptimizerAsset] = [] for stock in await Stock.all(): asset = await _build_stock_asset({"asset_id": stock.id, "quantity": Decimal("1")}) if asset is None: continue liquidity_score = _safe_float((asset.liquidity or {}).get("liquidity_score")) max_buy_value = _safe_float((asset.liquidity or {}).get("max_buy_value_20d")) if liquidity_score >= 35 and max_buy_value >= total_value * 0.02: stock_candidates.append(asset) if not stock_candidates: return { "method": "top-liquid-stock proxy", "available": False, "message": "IGROWTH holdings were not available and no liquid stock proxy basket passed the volume screen.", } top = sorted( stock_candidates, key=lambda asset: _asset_candidate_score(asset, _risk_settings("growth", None)), reverse=True, )[:5] weights = np.array([ max(_safe_float((asset.liquidity or {}).get("liquidity_score")), 1.0) for asset in top ]) weights = weights / weights.sum() proxy_return = float(np.dot(weights, np.array([asset.expected_return for asset in top]))) proxy_capacity = sum(_safe_float((asset.liquidity or {}).get("max_buy_value_20d")) for asset in top) notes = [] for asset in igrowth_assets: asset.expected_return = min(asset.expected_return + 0.01, 0.35) gap = proxy_return - asset.expected_return if gap > 0.05 and proxy_capacity >= asset.current_value * 0.25: penalty = min(0.04, gap * 0.25) asset.expected_return = max(asset.expected_return - penalty, 0.03) notes.append( f"{asset.symbol} underperformed the liquid-stock proxy, but keeps a liquidity/diversification credit." ) return { "method": "top-liquid-stock proxy", "available": True, "igrowth_symbols": [asset.symbol for asset in igrowth_assets], "proxy_expected_return": round(proxy_return, 4), "proxy_capacity_20d": round(proxy_capacity, 2), "proxy_assets": [ { "symbol": asset.symbol, "name": asset.name, "expected_return": round(asset.expected_return, 4), "volatility": round(asset.volatility, 4), "liquidity_score": round(_safe_float((asset.liquidity or {}).get("liquidity_score")), 2), "max_buy_value_20d": round(_safe_float((asset.liquidity or {}).get("max_buy_value_20d")), 2), } for asset in top ], "notes": notes or [ "IGROWTH is treated as a liquid/diversified wrapper and compared against a volume-screened top-stock proxy." ], } def _portfolio_stats(weights: np.ndarray, returns: np.ndarray, vols: np.ndarray) -> dict: portfolio_return = float(np.dot(weights, returns)) portfolio_vol = float(np.sqrt(np.dot(weights**2, vols**2))) sharpe = ( (portfolio_return - DEFAULT_RISK_FREE_RATE) / portfolio_vol if portfolio_vol > 0 else 0.0 ) return { "expected_return": portfolio_return, "volatility": portfolio_vol, "sharpe": sharpe, } def _class_totals(weights: np.ndarray, assets: list[OptimizerAsset]) -> dict[str, float]: totals = {"STOCK": 0.0, "FUND": 0.0, "BOND": 0.0} for weight, asset in zip(weights, assets): totals[asset.asset_type] = totals.get(asset.asset_type, 0.0) + float(weight) return totals def _respects_limits(weights: np.ndarray, assets: list[OptimizerAsset], settings: dict[str, Any]) -> bool: effective_max_asset_weight = max(float(settings["max_asset_weight"]), 1 / len(assets)) if np.any(weights < 0) or np.any(weights > effective_max_asset_weight): return False available_classes = {asset.asset_type for asset in assets} if len(available_classes) == 1: return True class_totals = _class_totals(weights, assets) class_limits = settings["class_limits"] for asset_type in available_classes: total = class_totals.get(asset_type, 0.0) min_weight, max_weight = class_limits[asset_type] if total < min_weight or total > max_weight: return False return True def _estimate_rebalance_fees( current_weights: np.ndarray, suggested_weights: np.ndarray, assets: list[OptimizerAsset], total_value: float, ) -> float: return sum(item["estimated_fee"] for item in _fee_breakdown(current_weights, suggested_weights, assets, total_value)) def _liquidity_trade_warnings( current_weights: np.ndarray, suggested_weights: np.ndarray, assets: list[OptimizerAsset], total_value: float, ) -> list[str]: warnings = [] for current, suggested, asset in zip(current_weights, suggested_weights, assets): if asset.asset_type != "STOCK" or suggested <= current: continue trade_amount = float(suggested - current) * total_value max_buy_value = _safe_float((asset.liquidity or {}).get("max_buy_value_20d")) if max_buy_value <= 0: warnings.append(f"{asset.symbol} has no recent turnover capacity, so new buys should be treated as speculative.") elif trade_amount > max_buy_value: warnings.append( f"{asset.symbol} buy is capped by observed volume: suggested demand exceeds about 10% of 20 trading days of average turnover." ) return warnings def _liquidity_capacity_penalty( current_weights: np.ndarray, suggested_weights: np.ndarray, assets: list[OptimizerAsset], total_value: float, ) -> float: penalty = 0.0 for current, suggested, asset in zip(current_weights, suggested_weights, assets): if suggested <= current: continue trade_amount = float(suggested - current) * total_value if asset.asset_type == "STOCK": max_buy_value = _safe_float((asset.liquidity or {}).get("max_buy_value_20d")) if max_buy_value <= 0: penalty += 0.25 continue if trade_amount > max_buy_value: penalty += min((trade_amount / max_buy_value - 1) * 0.08, 0.35) elif asset.asset_type == "FUND": terms = asset.liquidity or {} minimum = ( _safe_float(terms.get("min_initial")) if current <= 0 else _safe_float(terms.get("min_additional")) ) if minimum and trade_amount < minimum: penalty += 0.18 if not terms.get("prospectus_backed"): penalty += 0.08 if _safe_float(terms.get("lock_in_days")) > 0: penalty += min(_safe_float(terms.get("lock_in_days")) / 365 * 0.08, 0.08) return penalty def _apply_liquidity_caps( current_weights: np.ndarray, suggested_weights: np.ndarray, assets: list[OptimizerAsset], total_value: float, settings: dict[str, Any], ) -> tuple[np.ndarray, list[str]]: capped = suggested_weights.copy() warnings = [] excess = 0.0 for index, (current, suggested, asset) in enumerate(zip(current_weights, capped, assets)): if suggested <= current: continue if asset.asset_type == "STOCK": max_buy_value = _safe_float((asset.liquidity or {}).get("max_buy_value_20d")) if max_buy_value <= 0: max_weight = float(current) else: max_weight = float(current) + (max_buy_value / total_value if total_value else 0.0) if suggested > max_weight: excess += float(suggested - max_weight) capped[index] = max_weight warnings.append( f"{asset.symbol} increase was capped because DSE volume may not support the full trade size." ) elif asset.asset_type == "FUND": trade_amount = float(suggested - current) * total_value terms = asset.liquidity or {} minimum = ( _safe_float(terms.get("min_initial")) if current <= 0 else _safe_float(terms.get("min_additional")) ) if minimum and trade_amount < minimum: excess += float(capped[index] - current) capped[index] = 0.0 if current <= 0 else current warnings.append( f"{asset.symbol} buy was removed because it is below the prospectus minimum investment/addition." ) if excess <= 0: return capped, warnings effective_max_asset_weight = max(float(settings["max_asset_weight"]), 1 / len(assets)) receiver_order = {"FUND": 0, "BOND": 1, "STOCK": 2} def can_receive(index: int, asset: OptimizerAsset) -> bool: room = max(effective_max_asset_weight - float(capped[index]), 0.0) if room <= 0.0001: return False if asset.asset_type != "FUND": return True terms = asset.liquidity or {} minimum = ( _safe_float(terms.get("min_initial")) if capped[index] <= 0 else _safe_float(terms.get("min_additional")) ) return not minimum or room * total_value >= minimum receivers = sorted( [ (index, asset) for index, asset in enumerate(assets) if can_receive(index, asset) ], key=lambda item: (receiver_order.get(item[1].asset_type, 9), -item[1].expected_return), ) for index, _asset in receivers: if excess <= 0: break room = max(effective_max_asset_weight - float(capped[index]), 0.0) addition = min(room, excess) capped[index] += addition excess -= addition if capped.sum() > 0: capped = capped / capped.sum() return capped, warnings def _fee_breakdown( current_weights: np.ndarray, suggested_weights: np.ndarray, assets: list[OptimizerAsset], total_value: float, ) -> list[dict[str, Any]]: items = [] for current, suggested, asset in zip(current_weights, suggested_weights, assets): difference = float(suggested - current) trade_amount = abs(difference) * total_value if trade_amount <= 0: continue side = "BUY" if difference > 0 else "SELL" fee_rate = 0.0 fee_source = "No explicit transaction fee" fee_components: dict[str, float] = {} if asset.asset_type == "STOCK": band = _stock_fee_band(trade_amount) if band: fee_components = { "brokerage_commission": round(trade_amount * (_safe_float(band.get("brokerage_commission")) / 100), 2), "transaction_fee_cmsa": round(trade_amount * (_safe_float(band.get("transaction_fee_cmsa")) / 100), 2), "transaction_fee_dse": round(trade_amount * (_safe_float(band.get("transaction_fee_dse")) / 100), 2), "fidelity_fee": round(trade_amount * (_safe_float(band.get("fidelity_fee")) / 100), 2), "csd_fee": round(trade_amount * (_safe_float(band.get("csd_fee")) / 100), 2), } fee_rate = _safe_float(band.get("total_cost_to_investor")) / 100 fee_source = f"DSE equity fee band: {band.get('label')}" else: fee_rate = asset.fee_rate fee_source = "DSE equity fee fallback" elif asset.asset_type == "FUND" and side == "BUY": fee_rate = asset.fee_rate fee_source = "Fund entry load" elif asset.asset_type == "FUND" and side == "SELL": fee_rate = _safe_float((asset.liquidity or {}).get("exit_fee_rate")) fee_source = "Fund exit load from prospectus metadata" if fee_rate <= 0: continue fee = trade_amount * fee_rate items.append( { "asset_id": asset.asset_id, "asset_type": asset.asset_type, "symbol": asset.symbol, "name": asset.name, "side": side, "trade_amount": round(trade_amount, 2), "buy_amount": round(trade_amount if side == "BUY" else 0.0, 2), "sell_amount": round(trade_amount if side == "SELL" else 0.0, 2), "fee_rate": round(fee_rate, 6), "estimated_fee": round(fee, 2), "fee_components": fee_components, "fee_source": fee_source, } ) return sorted(items, key=lambda item: item["estimated_fee"], reverse=True) def _fee_summary(fee_items: list[dict[str, Any]]) -> dict[str, Any]: by_type: dict[str, float] = {} for item in fee_items: by_type[item["asset_type"]] = by_type.get(item["asset_type"], 0.0) + item["estimated_fee"] total = sum(by_type.values()) return { "total": round(total, 2), "by_asset_type": {key: round(value, 2) for key, value in by_type.items()}, "items": fee_items, } def _advisor_comment( current_stats: dict[str, float], suggested_stats: dict[str, float], allocations: list[dict[str, Any]], estimated_fees: float, risk_label: str, ) -> dict[str, Any]: increases = [item for item in allocations if item["difference"] > 0.03] reductions = [item for item in allocations if item["difference"] < -0.03] top_increase = increases[0] if increases else None new_buys = [item for item in increases if item.get("is_new_candidate")] top_reduction = sorted(reductions, key=lambda item: item["difference"])[0] if reductions else None sharpe_delta = suggested_stats["sharpe"] - current_stats["sharpe"] return_delta = suggested_stats["expected_return"] - current_stats["expected_return"] summary = ( f"The suggested allocation is tuned for a {risk_label.lower()} risk setting while trying " "to avoid one position carrying too much of the risk." ) if sharpe_delta > 0.05: summary = ( "The suggested allocation looks stronger on a risk-adjusted basis: it " "improves the expected Sharpe score while keeping the portfolio growth-focused." ) elif return_delta > 0: summary = ( "The suggested allocation mainly improves expected growth, but the risk-adjusted " "improvement is modest, so rebalance gradually rather than all at once." ) actions = [] if top_increase: if top_increase.get("is_new_candidate"): actions.append( f"Buy {top_increase['symbol']} toward {top_increase['suggested_weight'] * 100:.1f}% because it passed the return, valuation, fee, and liquidity screens." ) else: actions.append( f"Increase {top_increase['symbol']} toward {top_increase['suggested_weight'] * 100:.1f}% because its growth return still looks attractive after estimated fees." ) if new_buys and (not top_increase or not top_increase.get("is_new_candidate")): actions.append( "Add selected new stock/fund candidates only where observed trading volume can support the buy size." ) if top_reduction: actions.append( f"Reduce {top_reduction['symbol']} toward {top_reduction['suggested_weight'] * 100:.1f}% to lower concentration or weaker risk-adjusted exposure." ) if estimated_fees > 0: actions.append( f"Estimated rebalance fees are about TZS {estimated_fees:,.0f}, so the changes should be made only if the expected improvement is worth that cost." ) if not actions: actions.append("No major rebalance is needed; current weights are close to the suggested growth allocation.") return { "title": "Growth allocation view", "summary": summary, "key_actions": actions[:3], "caution": ( "This is an allocation model, not a guarantee. It uses historical prices, NAVs, " "dividends, fund payout flags, bond coupons, cached fundamentals, observed stock volume, and known transaction costs; financial statement quality and future news still need human review." ), } def _bond_execution_note(asset: OptimizerAsset) -> str: liquidity = asset.liquidity or {} secondary = liquidity.get("comparable_secondary_market") or [] primary = liquidity.get("comparable_primary_market") or [] parts = [] if liquidity.get("same_bond_secondary_available"): parts.append( f"same bond last traded {liquidity.get('latest_trade_date')} at yield " f"{_safe_float(liquidity.get('latest_yield_percent')):.2f}%" ) elif secondary: median_yield = liquidity.get("comparable_median_yield_percent") parts.append( f"no same-bond print; comparable secondary prints show median yield " f"{_safe_float(median_yield):.2f}%" ) else: parts.append("no fresh same-bond or comparable secondary print is stored") if primary: top = primary[0] parts.append( f"recent primary auction reference {top.get('bond_no')} " f"from {top.get('auction_date')} " f"({top.get('maturity_years')}y) approximate reinvestment yield " f"{_safe_float(top.get('implied_yield_percent')):.2f}% at price " f"{_safe_float(top.get('price_per_100')):.2f}" ) else: parts.append("no recent comparable primary auction reference is stored") caveat = liquidity.get("execution_caveat") if caveat: parts.append(str(caveat)) return "Secondary/primary market check: " + "; ".join(parts) def _bond_return_worth_note(asset: OptimizerAsset) -> str: liquidity = asset.liquidity or {} net_return = asset.expected_return - asset.fee_rate hurdle = DEFAULT_RISK_FREE_RATE spread = net_return - hurdle price_percent = _safe_float(liquidity.get("price_percent"), asset.current_price * 100) latest_yield = _safe_float(liquidity.get("latest_yield_percent"), net_return * 100) income_yield = asset.income_yield * 100 verdict = "adequate" if spread >= 0.015 else "thin" if spread >= 0 else "weak" premium_note = ( f" premium price {price_percent:.2f}% means coupon income is not the same as total return;" if price_percent > 110 else "" ) return ( f"Return worth check: {verdict}; market yield {latest_yield:.2f}% " f"vs {hurdle * 100:.2f}% hurdle ({spread * 100:+.2f}%), " f"income yield on invested price about {income_yield:.2f}%.{premium_note}" ) def _bond_coupon_advantage(asset: OptimizerAsset) -> float: liquidity = asset.liquidity or {} coupon = _safe_float(liquidity.get("coupon_rate_percent")) / 100 reinvestment_yield = _safe_float(liquidity.get("primary_reinvestment_yield_percent")) / 100 if coupon <= 0 or reinvestment_yield <= 0: return 0.0 return coupon - reinvestment_yield def _is_irreplaceable_income_bond(asset: OptimizerAsset) -> bool: return ( asset.asset_type == "BOND" and _bond_coupon_advantage(asset) >= HIGH_COUPON_ADVANTAGE_BPS / 10_000 ) def _has_rock_solid_fundamentals(asset: OptimizerAsset) -> bool: fundamentals = asset.fundamentals or {} if asset.asset_type != "STOCK": return False pe_ratio = _safe_float(fundamentals.get("pe_ratio")) pb_ratio = _safe_float(fundamentals.get("pb_ratio")) eps = _safe_float(fundamentals.get("eps")) debt_to_equity = _safe_float(fundamentals.get("debt_to_equity"), default=-1) return ( eps > 0 and 0 < pe_ratio <= 22 and 0 < pb_ratio <= 3.5 and 0 <= debt_to_equity <= 150 ) def _has_visible_macro_tailwind(asset: OptimizerAsset) -> bool: data = {**(asset.fundamentals or {}), **(asset.liquidity or {})} return bool( data.get("macro_tailwind") or data.get("sector_tailwind") or data.get("rate_tailwind") or str(data.get("macro_context") or "").lower() in {"tailwind", "supportive", "positive"} ) def _clean_exit_liquidity(asset: OptimizerAsset, trade_value: float) -> bool: liquidity = asset.liquidity or {} if asset.asset_type == "STOCK": max_buy_value = _safe_float(liquidity.get("max_buy_value_20d")) return max_buy_value > 0 and trade_value <= max_buy_value if asset.asset_type == "FUND": redemption_days = _safe_float(liquidity.get("redemption_days"), default=999) return redemption_days <= 7 and bool(liquidity.get("prospectus_backed")) return False def _replacement_quality_for_irreplaceable_bond( bond: OptimizerAsset, assets: list[OptimizerAsset], suggested_weights: np.ndarray, current_weights: np.ndarray, total_value: float, ) -> tuple[bool, list[str]]: coupon_advantage = _bond_coupon_advantage(bond) required_excess = coupon_advantage + LEGACY_BOND_REPLACEMENT_MARGIN increased_assets = [ (asset, float(suggested - current) * total_value) for asset, suggested, current in zip(assets, suggested_weights, current_weights) if asset.key != bond.key and suggested > current + 0.01 ] failures: list[str] = [] for asset, trade_value in increased_assets: net_return_advantage = asset.expected_return - asset.fee_rate - bond.income_yield gates = { "return_margin": net_return_advantage >= required_excess, "low_moderate_volatility": asset.volatility <= LOW_MODERATE_VOLATILITY_MAX, "rock_solid_fundamentals": _has_rock_solid_fundamentals(asset), "macro_tailwind": _has_visible_macro_tailwind(asset), "history_and_liquidity": ( _safe_float((asset.liquidity or {}).get("history_years")) >= MIN_REPLACEMENT_HISTORY_YEARS and _clean_exit_liquidity(asset, trade_value) ), } if all(gates.values()): return True, [] failed = ", ".join(name for name, passed in gates.items() if not passed) failures.append(f"{asset.symbol} failed replacement gates: {failed}") if not failures: failures.append("No increased replacement asset was available for the income stream.") return False, failures def _protect_irreplaceable_bonds( current_weights: np.ndarray, suggested_weights: np.ndarray, assets: list[OptimizerAsset], total_value: float, ) -> tuple[np.ndarray, list[str]]: protected = suggested_weights.copy() warnings: list[str] = [] for index, asset in enumerate(assets): if not _is_irreplaceable_income_bond(asset) or protected[index] >= current_weights[index]: continue qualified, failures = _replacement_quality_for_irreplaceable_bond( asset, assets, protected, current_weights, total_value, ) if qualified: continue restored = float(current_weights[index] - protected[index]) protected[index] = current_weights[index] warnings.append( f"{asset.symbol} reduction blocked: coupon materially exceeds recent primary reinvestment yield, so default is hold to maturity unless all replacement gates pass. " + " ".join(failures[:2]) ) receiver_indices = [ i for i, (current, suggested) in enumerate(zip(current_weights, protected)) if i != index and suggested > current ] receiver_total = sum(float(protected[i] - current_weights[i]) for i in receiver_indices) if receiver_total > 0: for i in receiver_indices: reduction = restored * float(protected[i] - current_weights[i]) / receiver_total protected[i] = max(current_weights[i], protected[i] - reduction) total = float(protected.sum()) if total > 0: protected = protected / total return protected, warnings def _optimize_sync( assets_payload: list[dict[str, Any]], simulations: int, risk_profile: str | None, risk_score: int | None, alternatives: list[dict[str, Any]], igrowth_context: dict[str, Any] | None = None, ) -> dict[str, Any]: assets = [OptimizerAsset(**payload) for payload in assets_payload] settings = _risk_settings(risk_profile, risk_score) total_value = sum(asset.current_value for asset in assets) current_weights = np.array([asset.current_value / total_value for asset in assets]) returns = np.array([asset.expected_return for asset in assets]) vols = np.array([asset.volatility for asset in assets]) income_yields = np.array([asset.income_yield for asset in assets]) best_weights = current_weights.copy() best_score = -float("inf") rng = np.random.default_rng(42) attempts = max(simulations, 1000) for _ in range(attempts): weights = rng.dirichlet(np.ones(len(assets))) if not _respects_limits(weights, assets, settings): continue stats = _portfolio_stats(weights, returns, vols) fees = _estimate_rebalance_fees(current_weights, weights, assets, total_value) fee_drag = fees / total_value if total_value else 0.0 income_score = float(np.dot(weights, income_yields)) liquidity_penalty = _liquidity_capacity_penalty(current_weights, weights, assets, total_value) score = ( stats["sharpe"] + settings["return_weight"] * stats["expected_return"] + settings["income_weight"] * income_score - settings["volatility_penalty"] * stats["volatility"] - fee_drag - liquidity_penalty ) if score > best_score: best_score = score best_weights = weights best_weights, liquidity_warnings = _apply_liquidity_caps( current_weights, best_weights, assets, total_value, settings, ) best_weights, bond_protection_warnings = _protect_irreplaceable_bonds( current_weights, best_weights, assets, total_value, ) liquidity_warnings.extend(bond_protection_warnings) liquidity_warnings.extend( _liquidity_trade_warnings(current_weights, best_weights, assets, total_value) ) current_stats = _portfolio_stats(current_weights, returns, vols) suggested_stats = _portfolio_stats(best_weights, returns, vols) estimated_fees = _estimate_rebalance_fees( current_weights, best_weights, assets, total_value ) fee_items = _fee_breakdown(current_weights, best_weights, assets, total_value) fee_summary = _fee_summary(fee_items) allocations = [] for asset, current_weight, suggested_weight in zip( assets, current_weights, best_weights ): difference = float(suggested_weight - current_weight) reason = "Hold allocation" if difference > 0.03: reason = ( "Buy new candidate after valuation, liquidity, fees, and prospectus checks" if asset.is_candidate else "Increase for growth-adjusted return after fees" ) if asset.asset_type == "BOND": reason = f"{reason}. {_bond_return_worth_note(asset)} {_bond_execution_note(asset)}" elif difference < -0.03: reason = "Reduce concentration or weaker risk-adjusted return" if asset.asset_type == "BOND": reason = f"{reason}. {_bond_return_worth_note(asset)} {_bond_execution_note(asset)}" elif asset.asset_type == "BOND": if _is_irreplaceable_income_bond(asset): reason = ( "Hold to maturity by default: coupon materially exceeds recent primary reinvestment yields, " "and no replacement passed all return, volatility, fundamentals, macro, history, and liquidity gates" ) reason = f"{reason}. {_bond_return_worth_note(asset)} {_bond_execution_note(asset)}" allocations.append( { "asset_id": asset.asset_id, "asset_type": asset.asset_type, "symbol": asset.symbol, "name": asset.name, "current_weight": round(float(current_weight), 4), "suggested_weight": round(float(suggested_weight), 4), "difference": round(difference, 4), "current_value": round(asset.current_value, 2), "suggested_value": round(float(suggested_weight) * total_value, 2), "rebalance_amount": round(difference * total_value, 2), "expected_return": round(asset.expected_return, 4), "volatility": round(asset.volatility, 4), "income_yield": round(asset.income_yield, 4), "fee_rate": round(asset.fee_rate, 4), "is_new_candidate": asset.is_candidate, "fundamentals": asset.fundamentals, "liquidity": asset.liquidity, "reason": reason, } ) sorted_allocations = sorted( allocations, key=lambda item: item["suggested_weight"], reverse=True, ) return { "objective": "growth", "risk_profile": settings["key"], "risk_label": settings["label"], "risk_score": settings["risk_score"], "total_value": round(total_value, 2), "risk_free_rate": DEFAULT_RISK_FREE_RATE, "constraints": { "max_asset_weight": max(float(settings["max_asset_weight"]), 1 / len(assets)), "class_limits": settings["class_limits"], "class_limits_apply_only_when_multiple_asset_classes_exist": True, }, "current": {k: round(v, 4) for k, v in current_stats.items()}, "suggested": {k: round(v, 4) for k, v in suggested_stats.items()}, "estimated_rebalance_fees": round(estimated_fees, 2), "fee_breakdown": fee_summary, "liquidity_warnings": liquidity_warnings, "igrowth_proxy_comparison": igrowth_context, "advisor_comment": _advisor_comment( current_stats, suggested_stats, sorted_allocations, estimated_fees, settings["label"], ), "allocations": sorted_allocations, "alternatives": alternatives, "notes": [ "Suggested allocations can include new stock and fund candidates when they pass valuation, liquidity, fee, and risk checks.", "Stock buy fee estimates use the stored DSE transaction fee bands and stock increases are capped by observed DSE turnover capacity.", "Stock dividends, fund income flags, bond coupons, and fund entry loads are included where available.", "Stock fundamentals use cached Simply Wall St snapshots when available; missing EPS, PE, or PB values are not guessed.", "Optimization uses Monte Carlo simulation to stay lightweight on Hugging Face.", ], } async def analyze_portfolio_growth_allocation( portfolio_id: int, simulations: int = DEFAULT_SIMULATIONS, risk_profile: str | None = None, risk_score: int | None = None, ) -> dict[str, Any]: current_assets = await _build_assets(portfolio_id) if not current_assets: raise ValueError("At least one priced asset is required for allocation analysis") alternatives = await _suggest_alternatives(current_assets, risk_profile, risk_score) settings = _risk_settings(risk_profile, risk_score) total_value = sum(asset.current_value for asset in current_assets) igrowth_context = await _igrowth_proxy_context(current_assets, total_value) candidate_assets = await _build_new_candidate_assets(current_assets, risk_profile, risk_score) assets = current_assets + candidate_assets if len(assets) == 1: asset = assets[0] current_stats = { "expected_return": asset.expected_return, "volatility": asset.volatility, "sharpe": ( (asset.expected_return - DEFAULT_RISK_FREE_RATE) / asset.volatility if asset.volatility > 0 else 0.0 ), } allocation = { "asset_id": asset.asset_id, "asset_type": asset.asset_type, "symbol": asset.symbol, "name": asset.name, "current_weight": 1.0, "suggested_weight": 1.0, "difference": 0.0, "current_value": round(asset.current_value, 2), "suggested_value": round(asset.current_value, 2), "rebalance_amount": 0.0, "expected_return": round(asset.expected_return, 4), "volatility": round(asset.volatility, 4), "fee_rate": round(asset.fee_rate, 4), "reason": "Only priced asset in portfolio; add other assets before optimization can rebalance.", } return { "objective": "growth", "risk_profile": settings["key"], "risk_label": settings["label"], "risk_score": settings["risk_score"], "total_value": round(asset.current_value, 2), "risk_free_rate": DEFAULT_RISK_FREE_RATE, "constraints": { "max_asset_weight": 1.0, "class_limits": GROWTH_CLASS_LIMITS, "class_limits_apply_only_when_multiple_asset_classes_exist": True, }, "current": {k: round(v, 4) for k, v in current_stats.items()}, "suggested": {k: round(v, 4) for k, v in current_stats.items()}, "estimated_rebalance_fees": 0.0, "fee_breakdown": _fee_summary([]), "advisor_comment": { "title": "Concentration warning", "summary": ( f"Your portfolio is currently 100% allocated to {asset.symbol}. " "That may have worked well, but the growth engine is concentrated in one security, " "so the next improvement is diversification rather than reweighting." ), "key_actions": [ "Add at least one more priced asset before running a full optimizer.", "For a growth portfolio, consider blending stocks with a liquid fund, ETF, or bond exposure to reduce single-name risk.", "Keep fees in mind: adding a new DSE stock position has transaction costs, so diversify in meaningful ticket sizes.", ], "caution": ( "The model cannot calculate an optimized allocation from one asset alone. " "It can only flag concentration risk and explain what data is missing." ), }, "allocations": [allocation], "alternatives": alternatives, "notes": [ "Single-asset portfolios receive a concentration analysis instead of a failed optimization.", "Add at least two priced assets to enable portfolio rebalancing advice.", ], } return await asyncio.to_thread( _optimize_sync, [asset.__dict__ for asset in assets], simulations, risk_profile, risk_score, alternatives, igrowth_context, ) async def run_portfolio_analysis_task( task_id: int, portfolio_id: int, simulations: int = DEFAULT_SIMULATIONS, risk_profile: str | None = None, risk_score: int | None = None, ) -> None: await ImportTask.filter(id=task_id).update( status=TaskStatus.RUNNING, details={ "portfolio_id": portfolio_id, "risk_profile": _risk_settings(risk_profile, risk_score)["key"], "risk_score": _risk_settings(risk_profile, risk_score)["risk_score"], "status_message": "Portfolio growth allocation analysis is running", }, ) try: result = await analyze_portfolio_growth_allocation(portfolio_id, simulations, risk_profile, risk_score) except Exception as exc: await ImportTask.filter(id=task_id).update( status=TaskStatus.FAILED, details={ "portfolio_id": portfolio_id, "error": str(exc), }, ) return await ImportTask.filter(id=task_id).update( status=TaskStatus.COMPLETED, details={ "portfolio_id": portfolio_id, "result": result, }, )