Spaces:
Sleeping
Sleeping
| import asyncio | |
| from dataclasses import dataclass | |
| from dataclasses import replace | |
| from datetime import date, timedelta | |
| from decimal import Decimal, InvalidOperation | |
| import re | |
| from typing import Any | |
| import numpy as np | |
| from App.analysis.volatility import calculate_annualized_volatility | |
| from App.routers.bonds.models import Bond | |
| from App.routers.economy.valuation import get_bond_pricing_context | |
| from App.routers.funds.models import FundPerformance, MutualFund | |
| from App.routers.stocks.metrics import calculate_metrics, calculate_liquidity_metrics | |
| from App.routers.stocks.models import Stock, StockPriceData | |
| from App.routers.stocks.seed import load_dse_transaction_fee_seed_data | |
| from App.routers.tasks.models import ImportTask, TaskStatus | |
| GROWTH_CLASS_LIMITS = { | |
| "STOCK": (0.30, 0.70), | |
| "FUND": (0.10, 0.45), | |
| "BOND": (0.05, 0.35), | |
| } | |
| MAX_ASSET_WEIGHT = 0.35 | |
| RISK_PROFILES = { | |
| "conservative": { | |
| "label": "Conservative", | |
| "class_limits": {"STOCK": (0.00, 0.35), "FUND": (0.25, 0.70), "BOND": (0.15, 0.60)}, | |
| "max_asset_weight": 0.25, | |
| "return_weight": 0.55, | |
| "volatility_penalty": 1.35, | |
| "income_weight": 0.45, | |
| }, | |
| "balanced": { | |
| "label": "Balanced", | |
| "class_limits": GROWTH_CLASS_LIMITS, | |
| "max_asset_weight": MAX_ASSET_WEIGHT, | |
| "return_weight": 0.85, | |
| "volatility_penalty": 0.85, | |
| "income_weight": 0.25, | |
| }, | |
| "growth": { | |
| "label": "Growth", | |
| "class_limits": {"STOCK": (0.30, 0.75), "FUND": (0.05, 0.45), "BOND": (0.00, 0.25)}, | |
| "max_asset_weight": 0.40, | |
| "return_weight": 1.15, | |
| "volatility_penalty": 0.45, | |
| "income_weight": 0.10, | |
| }, | |
| } | |
| DEFAULT_RISK_FREE_RATE = 0.12 | |
| DEFAULT_SIMULATIONS = 6000 | |
| HIGH_COUPON_ADVANTAGE_BPS = 150 | |
| LEGACY_BOND_REPLACEMENT_MARGIN = 0.06 | |
| LOW_MODERATE_VOLATILITY_MAX = 0.16 | |
| MIN_REPLACEMENT_HISTORY_YEARS = 3.0 | |
| class OptimizerAsset: | |
| key: str | |
| asset_id: int | |
| asset_type: str | |
| symbol: str | |
| name: str | |
| quantity: float | |
| current_price: float | |
| current_value: float | |
| expected_return: float | |
| volatility: float | |
| income_yield: float | |
| fee_rate: float | |
| fundamentals: dict[str, Any] | None = None | |
| liquidity: dict[str, Any] | None = None | |
| is_candidate: bool = False | |
| def _blend_pair(start: tuple[float, float], end: tuple[float, float], t: float) -> tuple[float, float]: | |
| return ( | |
| start[0] + (end[0] - start[0]) * t, | |
| start[1] + (end[1] - start[1]) * t, | |
| ) | |
| def _risk_settings(risk_profile: str | None, risk_score: int | None = None) -> dict[str, Any]: | |
| if risk_score is not None: | |
| score = max(0, min(100, int(risk_score))) | |
| t = score / 100 | |
| low = RISK_PROFILES["conservative"] | |
| high = RISK_PROFILES["growth"] | |
| if score < 34: | |
| label = "Conservative" | |
| elif score > 66: | |
| label = "Aggressive" | |
| else: | |
| label = "Balanced" | |
| return { | |
| "key": "conservative" if score < 34 else "growth" if score > 66 else "balanced", | |
| "label": label, | |
| "risk_score": score, | |
| "class_limits": { | |
| asset_type: _blend_pair(low["class_limits"][asset_type], high["class_limits"][asset_type], t) | |
| for asset_type in GROWTH_CLASS_LIMITS | |
| }, | |
| "max_asset_weight": low["max_asset_weight"] + (high["max_asset_weight"] - low["max_asset_weight"]) * t, | |
| "return_weight": low["return_weight"] + (high["return_weight"] - low["return_weight"]) * t, | |
| "volatility_penalty": low["volatility_penalty"] + (high["volatility_penalty"] - low["volatility_penalty"]) * t, | |
| "income_weight": low["income_weight"] + (high["income_weight"] - low["income_weight"]) * t, | |
| } | |
| key = str(risk_profile or "balanced").strip().lower() | |
| if key in {"low", "safe", "safer", "defensive"}: | |
| key = "conservative" | |
| elif key in {"high", "aggressive"}: | |
| key = "growth" | |
| settings = RISK_PROFILES.get(key, RISK_PROFILES["balanced"]) | |
| fallback_score = 15 if key == "conservative" else 85 if key == "growth" else 50 | |
| return {"key": key if key in RISK_PROFILES else "balanced", "risk_score": fallback_score, **settings} | |
| def _safe_float(value: Any, default: float = 0.0) -> float: | |
| if value in (None, ""): | |
| return default | |
| try: | |
| return float(value) | |
| except (TypeError, ValueError, InvalidOperation): | |
| return default | |
| def _annualized_return(values: list[tuple[date, float]]) -> float | None: | |
| ordered = [(d, v) for d, v in sorted(values, key=lambda item: item[0]) if v > 0] | |
| if len(ordered) < 2: | |
| return None | |
| start_date, start_value = ordered[0] | |
| end_date, end_value = ordered[-1] | |
| days = max((end_date - start_date).days, 1) | |
| total_return = (end_value / start_value) - 1 | |
| return (1 + total_return) ** (365 / days) - 1 | |
| def _history_years(values: list[tuple[date, float]]) -> float: | |
| ordered = [item for item in sorted(values, key=lambda item: item[0]) if item[1] > 0] | |
| if len(ordered) < 2: | |
| return 0.0 | |
| return max((ordered[-1][0] - ordered[0][0]).days / 365.25, 0.0) | |
| def _stock_fee_band(consideration: float) -> dict[str, Any] | None: | |
| payload = load_dse_transaction_fee_seed_data() | |
| for band in payload.get("bands", []): | |
| min_value = _safe_float(band.get("min_consideration")) | |
| max_raw = band.get("max_consideration") | |
| max_value = float("inf") if max_raw is None else _safe_float(max_raw) | |
| if consideration > min_value and consideration <= max_value: | |
| return band | |
| return None | |
| def _stock_fee_rate(consideration: float) -> float: | |
| band = _stock_fee_band(consideration) | |
| if band: | |
| return _safe_float(band.get("total_cost_to_investor")) / 100 | |
| return 0.0206 | |
| def _parse_percent(raw_value: Any) -> float: | |
| if raw_value is None: | |
| return 0.0 | |
| text = str(raw_value) | |
| digits = "".join(ch for ch in text if ch.isdigit() or ch == ".") | |
| return _safe_float(digits) / 100 if digits else 0.0 | |
| def _parse_distribution_rate(raw_value: Any) -> float: | |
| text = str(raw_value or "") | |
| matches = re.findall(r"(\d+(?:\.\d+)?)\s*%", text) | |
| if not matches: | |
| return 0.0 | |
| return max(_safe_float(match) / 100 for match in matches) | |
| def _parse_money_value(raw_value: Any) -> float | None: | |
| if raw_value in (None, ""): | |
| return None | |
| digits = "".join(ch for ch in str(raw_value) if ch.isdigit() or ch == ".") | |
| return _safe_float(digits) if digits else None | |
| def _parse_fee_rate(raw_value: Any) -> float: | |
| text = str(raw_value or "").lower() | |
| if not text or "none" in text or "nil" in text: | |
| return 0.0 | |
| match = re.search(r"(\d+(?:\.\d+)?)\s*%", text) | |
| return _safe_float(match.group(1)) / 100 if match else 0.0 | |
| async def _fund_prospectus_terms(fund: MutualFund) -> dict[str, Any]: | |
| try: | |
| info = await fund.info_record | |
| except Exception: | |
| info = None | |
| raw_data = info.raw_data if info and isinstance(info.raw_data, dict) else {} | |
| liquidity = raw_data.get("liquidity") if isinstance(raw_data.get("liquidity"), dict) else {} | |
| contributions = raw_data.get("contributions") if isinstance(raw_data.get("contributions"), dict) else {} | |
| documents = raw_data.get("documents") if isinstance(raw_data.get("documents"), list) else [] | |
| min_initial = ( | |
| _parse_money_value(raw_data.get("min_initial")) | |
| or _parse_money_value(fund.min_initial) | |
| or _parse_money_value(contributions.get("min_initial")) | |
| ) | |
| min_additional = ( | |
| _parse_money_value(raw_data.get("min_additional")) | |
| or _parse_money_value(fund.min_additional) | |
| or _parse_money_value(contributions.get("min_additional")) | |
| ) | |
| min_redemption = ( | |
| _parse_money_value(raw_data.get("min_redemption")) | |
| or _parse_money_value(liquidity.get("min_redemption")) | |
| ) | |
| lock_in_days = raw_data.get("lock_in_days") or liquidity.get("lock_in_days") | |
| redemption_days = fund.redemption_days or raw_data.get("redemption_days") or liquidity.get("redemption_days") | |
| exit_fee_rate = _parse_fee_rate(fund.exit_load or raw_data.get("exit_load") or liquidity.get("exit_fee")) | |
| entry_fee_rate = _safe_float(fund.entry_load) / 100 | |
| return { | |
| "prospectus_backed": bool(raw_data or documents), | |
| "prospectus_documents": [ | |
| item.get("path") or item.get("name") | |
| for item in documents | |
| if isinstance(item, dict) and (item.get("path") or item.get("name")) | |
| ], | |
| "min_initial": min_initial, | |
| "min_additional": min_additional, | |
| "min_redemption": min_redemption, | |
| "lock_in_days": int(lock_in_days) if str(lock_in_days or "").isdigit() else None, | |
| "redemption_days": int(redemption_days) if str(redemption_days or "").isdigit() else redemption_days, | |
| "entry_fee_rate": entry_fee_rate, | |
| "exit_fee_rate": exit_fee_rate, | |
| "exit_load": fund.exit_load, | |
| "fund_type": fund.fund_type, | |
| "currency": fund.currency, | |
| } | |
| def _fundamental_adjusted_stock_return( | |
| base_return: float, | |
| dividend_yield: float, | |
| fundamentals: dict[str, Any], | |
| liquidity: dict[str, Any], | |
| ) -> float: | |
| pe_ratio = _safe_float(fundamentals.get("pe_ratio")) | |
| pb_ratio = _safe_float(fundamentals.get("pb_ratio")) | |
| eps = fundamentals.get("eps") | |
| payout_ratio = _safe_float(fundamentals.get("payout_ratio")) | |
| debt_to_equity = _safe_float(fundamentals.get("debt_to_equity")) | |
| liquidity_score = _safe_float(liquidity.get("liquidity_score")) | |
| adjusted = base_return | |
| if pe_ratio > 0: | |
| earnings_yield = 1 / pe_ratio | |
| adjusted = (adjusted * 0.75) + ((earnings_yield + dividend_yield) * 0.25) | |
| if pe_ratio > 30: | |
| adjusted -= 0.035 | |
| elif pe_ratio > 22: | |
| adjusted -= 0.015 | |
| elif pe_ratio < 10: | |
| adjusted += 0.01 | |
| elif eps not in (None, ""): | |
| adjusted -= 0.07 | |
| else: | |
| adjusted -= 0.015 | |
| if pb_ratio > 0: | |
| if pb_ratio > 5: | |
| adjusted -= 0.05 | |
| elif pb_ratio > 3: | |
| adjusted -= 0.025 | |
| elif pb_ratio <= 2: | |
| adjusted += 0.01 | |
| if payout_ratio > 100: | |
| adjusted -= 0.025 | |
| if debt_to_equity > 150: | |
| adjusted -= 0.015 | |
| if liquidity_score < 25: | |
| adjusted -= 0.05 | |
| elif liquidity_score < 45: | |
| adjusted -= 0.025 | |
| elif liquidity_score > 75: | |
| adjusted += 0.01 | |
| return adjusted | |
| async def _fund_income_yield(fund: MutualFund) -> float: | |
| if not fund.pays_income: | |
| return 0.0 | |
| try: | |
| info = await fund.info_record | |
| except Exception: | |
| info = None | |
| data = info.raw_data if info and isinstance(info.raw_data, dict) else {} | |
| distribution = data.get("distribution") if isinstance(data.get("distribution"), dict) else {} | |
| other_facts = data.get("other_facts") if isinstance(data.get("other_facts"), dict) else {} | |
| candidates = [ | |
| fund.income_amount, | |
| distribution.get("max_distribution_rate"), | |
| distribution.get("policy"), | |
| other_facts.get("max_distribution_rate"), | |
| ] | |
| return min(max((_parse_distribution_rate(value) for value in candidates), default=0.0), 0.18) | |
| async def _collect_positions(portfolio_id: int) -> list[dict[str, Any]]: | |
| from App.routers.portfolio.service import PortfolioService | |
| positions = await PortfolioService.get_positions(portfolio_id) | |
| return [ | |
| { | |
| "asset_type": str(position["asset_type"]).upper(), | |
| "asset_id": position["asset_id"], | |
| "quantity": Decimal(str(position["quantity"])), | |
| } | |
| for position in positions | |
| if _safe_float(position.get("current_value")) > 0 | |
| ] | |
| async def _build_stock_asset(position: dict[str, Any]) -> OptimizerAsset | None: | |
| stock = await Stock.get_or_none(id=position["asset_id"]) | |
| if stock is None: | |
| return None | |
| prices = await StockPriceData.filter( | |
| stock_id=stock.id, | |
| date__gte=date.today() - timedelta(days=365 * 5 + 7), | |
| ).order_by("date").values("date", "closing_price") | |
| price_values = [(row["date"], _safe_float(row["closing_price"])) for row in prices] | |
| latest = await StockPriceData.filter(stock=stock).order_by("-date").first() | |
| if latest is None: | |
| return None | |
| metrics = await calculate_metrics(stock) or {} | |
| capital_return = _annualized_return(price_values) | |
| dividend_yield = _safe_float(metrics.get("dividend_yield")) / 100 | |
| liquidity = metrics.get("liquidity") or await calculate_liquidity_metrics(stock) | |
| liquidity["history_years"] = round(_history_years(price_values), 2) | |
| fundamentals = { | |
| "eps": metrics.get("eps"), | |
| "pe_ratio": metrics.get("pe_ratio"), | |
| "pb_ratio": metrics.get("pb_ratio") or metrics.get("price_to_book"), | |
| "ps_ratio": metrics.get("ps_ratio"), | |
| "earnings_yield": metrics.get("earnings_yield"), | |
| "payout_ratio": metrics.get("payout_ratio"), | |
| "gross_margin": metrics.get("gross_margin"), | |
| "net_margin": metrics.get("net_margin"), | |
| "debt_to_equity": metrics.get("debt_to_equity"), | |
| "source_name": metrics.get("source_name"), | |
| "source_url": metrics.get("source_url"), | |
| "data_updated_at": metrics.get("data_updated_at"), | |
| } | |
| expected_return = _fundamental_adjusted_stock_return( | |
| base_return=(capital_return if capital_return is not None else 0.10) + dividend_yield, | |
| dividend_yield=dividend_yield, | |
| fundamentals=fundamentals, | |
| liquidity=liquidity, | |
| ) | |
| volatility_result = calculate_annualized_volatility(price_values) | |
| volatility = ( | |
| volatility_result.annualized_volatility if volatility_result else 0.28 | |
| ) | |
| current_price = _safe_float(latest.closing_price) | |
| quantity = _safe_float(position["quantity"]) | |
| current_value = quantity * current_price | |
| return OptimizerAsset( | |
| key=f"STOCK:{stock.id}", | |
| asset_id=stock.id, | |
| asset_type="STOCK", | |
| symbol=stock.symbol, | |
| name=stock.name, | |
| quantity=quantity, | |
| current_price=current_price, | |
| current_value=current_value, | |
| expected_return=max(min(expected_return, 0.60), -0.30), | |
| volatility=max(volatility, 0.08), | |
| income_yield=max(dividend_yield, 0.0), | |
| fee_rate=_stock_fee_rate(current_value), | |
| fundamentals=fundamentals, | |
| liquidity=liquidity, | |
| ) | |
| async def _build_fund_asset(position: dict[str, Any]) -> OptimizerAsset | None: | |
| fund = await MutualFund.get_or_none(id=position["asset_id"]) | |
| if fund is None: | |
| return None | |
| rows = await FundPerformance.filter( | |
| fund_id=fund.id, | |
| record_date__gte=date.today() - timedelta(days=365 * 5 + 7), | |
| ).order_by("record_date").values("record_date", "nav_per_unit") | |
| nav_values = [ | |
| (row["record_date"], _safe_float(row["nav_per_unit"])) | |
| for row in rows | |
| if row.get("nav_per_unit") is not None | |
| ] | |
| latest = await FundPerformance.filter(fund=fund).order_by("-record_date").first() | |
| if latest is None or latest.nav_per_unit is None: | |
| return None | |
| income_yield = await _fund_income_yield(fund) | |
| expected_return = (_annualized_return(nav_values) or 0.13) + income_yield | |
| volatility_result = calculate_annualized_volatility(nav_values) | |
| volatility = ( | |
| volatility_result.annualized_volatility if volatility_result else 0.08 | |
| ) | |
| current_price = _safe_float(latest.nav_per_unit) | |
| quantity = _safe_float(position["quantity"]) | |
| current_value = quantity * current_price | |
| prospectus_terms = await _fund_prospectus_terms(fund) | |
| redemption_days = prospectus_terms.get("redemption_days") | |
| liquidity_score = 82 if isinstance(redemption_days, int) and redemption_days <= 7 else 65 | |
| if prospectus_terms.get("lock_in_days"): | |
| liquidity_score -= min(int(prospectus_terms["lock_in_days"]) / 180 * 30, 30) | |
| if not prospectus_terms.get("prospectus_backed"): | |
| liquidity_score -= 12 | |
| if str(prospectus_terms.get("currency") or "TZS").upper() != "TZS": | |
| liquidity_score -= 18 | |
| return OptimizerAsset( | |
| key=f"FUND:{fund.id}", | |
| asset_id=fund.id, | |
| asset_type="FUND", | |
| symbol=fund.name[:10].upper(), | |
| name=fund.name, | |
| quantity=quantity, | |
| current_price=current_price, | |
| current_value=current_value, | |
| expected_return=max(min(expected_return, 0.35), 0.03), | |
| volatility=max(volatility, 0.03), | |
| income_yield=income_yield, | |
| fee_rate=prospectus_terms["entry_fee_rate"], | |
| liquidity={ | |
| **prospectus_terms, | |
| "liquidity_score": round(max(liquidity_score, 20), 2), | |
| "history_years": round(_history_years(nav_values), 2), | |
| }, | |
| ) | |
| async def _build_bond_asset(position: dict[str, Any]) -> OptimizerAsset | None: | |
| bond = await Bond.get_or_none(id=position["asset_id"]) | |
| if bond is None: | |
| return None | |
| pricing_context = await get_bond_pricing_context(bond) | |
| price_per_100 = _safe_float(pricing_context.get("price_percent"), 100.0) | |
| current_price = price_per_100 / 100 | |
| quantity = _safe_float(position["quantity"]) | |
| current_value = quantity * current_price | |
| coupon_rate = _safe_float(bond.coupon_rate) / 100 | |
| latest_yield_percent = pricing_context.get("latest_yield_percent") | |
| market_yield = ( | |
| _safe_float(latest_yield_percent) / 100 | |
| if latest_yield_percent not in (None, "") | |
| else 0.0 | |
| ) | |
| expected_return = market_yield if market_yield > 0 else max(coupon_rate, 0.10) | |
| income_yield = coupon_rate / current_price if current_price > 0 else coupon_rate | |
| return OptimizerAsset( | |
| key=f"BOND:{bond.id}", | |
| asset_id=bond.id, | |
| asset_type="BOND", | |
| symbol=bond.isin, | |
| name=f"{bond.maturity_years} Yr Treasury Bond", | |
| quantity=quantity, | |
| current_price=current_price, | |
| current_value=current_value, | |
| expected_return=expected_return, | |
| volatility=0.04, | |
| income_yield=income_yield, | |
| fee_rate=0.0, | |
| liquidity={ | |
| "liquidity_score": pricing_context.get("liquidity_score", 45), | |
| "valuation_quality": pricing_context.get("valuation_quality"), | |
| "execution_caveat": pricing_context.get("execution_caveat"), | |
| "latest_trade_date": pricing_context.get("latest_trade_date"), | |
| "price_source": pricing_context.get("price_source"), | |
| "latest_yield_percent": latest_yield_percent, | |
| "coupon_rate_percent": _safe_float(bond.coupon_rate), | |
| "price_percent": price_per_100, | |
| "primary_reinvestment_yield_percent": ( | |
| _safe_float((pricing_context.get("comparable_primary_market") or [{}])[0].get("implied_yield_percent")) | |
| if pricing_context.get("comparable_primary_market") | |
| else None | |
| ), | |
| "same_bond_secondary_available": pricing_context.get("same_bond_secondary_available"), | |
| "comparable_secondary_market": pricing_context.get("comparable_secondary_market") or [], | |
| "comparable_median_yield_percent": pricing_context.get("comparable_median_yield_percent"), | |
| "comparable_primary_market": pricing_context.get("comparable_primary_market") or [], | |
| }, | |
| ) | |
| async def _build_assets(portfolio_id: int) -> list[OptimizerAsset]: | |
| assets: list[OptimizerAsset] = [] | |
| for position in await _collect_positions(portfolio_id): | |
| asset_type = position["asset_type"] | |
| if asset_type == "STOCK": | |
| asset = await _build_stock_asset(position) | |
| elif asset_type in {"FUND", "UTT"}: | |
| asset = await _build_fund_asset(position) | |
| elif asset_type == "BOND": | |
| asset = await _build_bond_asset(position) | |
| else: | |
| asset = None | |
| if asset and asset.current_value > 0: | |
| assets.append(asset) | |
| return assets | |
| def _asset_candidate_score(asset: OptimizerAsset, settings: dict[str, Any]) -> float: | |
| liquidity_score = _safe_float((asset.liquidity or {}).get("liquidity_score")) / 100 | |
| valuation_penalty = 0.0 | |
| if asset.asset_type == "STOCK": | |
| pe_ratio = _safe_float((asset.fundamentals or {}).get("pe_ratio")) | |
| pb_ratio = _safe_float((asset.fundamentals or {}).get("pb_ratio")) | |
| if pe_ratio > 30 or pb_ratio > 5: | |
| valuation_penalty += 0.05 | |
| if pe_ratio < 0: | |
| valuation_penalty += 0.08 | |
| return ( | |
| asset.expected_return * settings["return_weight"] | |
| + asset.income_yield * settings["income_weight"] | |
| + liquidity_score * 0.08 | |
| - asset.volatility * settings["volatility_penalty"] | |
| - valuation_penalty | |
| ) | |
| async def _build_new_candidate_assets( | |
| current_assets: list[OptimizerAsset], | |
| risk_profile: str | None, | |
| risk_score: int | None, | |
| limit: int = 8, | |
| ) -> list[OptimizerAsset]: | |
| current_keys = {asset.key for asset in current_assets} | |
| settings = _risk_settings(risk_profile, risk_score) | |
| candidates: list[OptimizerAsset] = [] | |
| funds = await MutualFund.filter(status="Active").all() | |
| for fund in funds: | |
| if f"FUND:{fund.id}" in current_keys: | |
| continue | |
| asset = await _build_fund_asset({"asset_id": fund.id, "quantity": Decimal("1")}) | |
| if asset: | |
| if not (asset.liquidity or {}).get("prospectus_backed"): | |
| continue | |
| if str((asset.liquidity or {}).get("currency") or "TZS").upper() != "TZS": | |
| continue | |
| candidates.append(replace(asset, quantity=0.0, current_value=0.0, is_candidate=True)) | |
| stocks = await Stock.all() | |
| for stock in stocks: | |
| if f"STOCK:{stock.id}" in current_keys: | |
| continue | |
| asset = await _build_stock_asset({"asset_id": stock.id, "quantity": Decimal("1")}) | |
| if not asset: | |
| continue | |
| max_buy_value = _safe_float((asset.liquidity or {}).get("max_buy_value_20d")) | |
| if max_buy_value <= 0 and _safe_float((asset.liquidity or {}).get("liquidity_score")) < 20: | |
| continue | |
| candidates.append(replace(asset, quantity=0.0, current_value=0.0, is_candidate=True)) | |
| return sorted( | |
| candidates, | |
| key=lambda asset: _asset_candidate_score(asset, settings), | |
| reverse=True, | |
| )[:limit] | |
| async def _stock_candidate(stock: Stock) -> dict[str, Any] | None: | |
| asset = await _build_stock_asset({"asset_id": stock.id, "quantity": Decimal("1")}) | |
| if asset is None: | |
| return None | |
| return { | |
| "asset_id": asset.asset_id, | |
| "asset_type": asset.asset_type, | |
| "symbol": asset.symbol, | |
| "name": asset.name, | |
| "expected_return": round(asset.expected_return, 4), | |
| "volatility": round(asset.volatility, 4), | |
| "income_yield": round(asset.income_yield, 4), | |
| "liquidity": asset.liquidity, | |
| "fundamentals": asset.fundamentals, | |
| "reason": "Stock candidate with fundamentals, dividend history, and observed DSE liquidity.", | |
| } | |
| async def _fund_candidate(fund: MutualFund) -> dict[str, Any] | None: | |
| asset = await _build_fund_asset({"asset_id": fund.id, "quantity": Decimal("1")}) | |
| if asset is None: | |
| return None | |
| return { | |
| "asset_id": asset.asset_id, | |
| "asset_type": asset.asset_type, | |
| "symbol": asset.symbol, | |
| "name": asset.name, | |
| "expected_return": round(asset.expected_return, 4), | |
| "volatility": round(asset.volatility, 4), | |
| "income_yield": round(asset.income_yield, 4), | |
| "pays_income": asset.income_yield > 0, | |
| "liquidity": asset.liquidity, | |
| "reason": ( | |
| "Income-paying fund candidate with lower measured volatility." | |
| if asset.income_yield > 0 | |
| else "Fund candidate with lower measured volatility." | |
| ), | |
| } | |
| async def _suggest_alternatives(current_assets: list[OptimizerAsset], risk_profile: str | None, risk_score: int | None) -> list[dict[str, Any]]: | |
| current_keys = {asset.key for asset in current_assets} | |
| settings = _risk_settings(risk_profile, risk_score) | |
| candidates: list[dict[str, Any]] = [] | |
| funds = await MutualFund.filter(status="Active").all() | |
| for fund in funds: | |
| if f"FUND:{fund.id}" in current_keys: | |
| continue | |
| candidate = await _fund_candidate(fund) | |
| if candidate: | |
| candidates.append(candidate) | |
| stocks = await Stock.all() | |
| for stock in stocks: | |
| if f"STOCK:{stock.id}" in current_keys: | |
| continue | |
| candidate = await _stock_candidate(stock) | |
| if candidate: | |
| candidates.append(candidate) | |
| if not candidates: | |
| return [] | |
| current_volatility = max((asset.volatility for asset in current_assets), default=0.0) | |
| max_volatility = current_volatility * (0.85 if settings["key"] == "conservative" else 1.10) | |
| safer = [item for item in candidates if item["volatility"] <= max_volatility] or candidates | |
| def score(item: dict[str, Any]) -> float: | |
| return ( | |
| item["expected_return"] * settings["return_weight"] | |
| + item.get("income_yield", 0.0) * settings["income_weight"] | |
| - item["volatility"] * settings["volatility_penalty"] | |
| ) | |
| return sorted(safer, key=score, reverse=True)[:5] | |
| def _is_igrowth(asset: OptimizerAsset) -> bool: | |
| text = f"{asset.symbol} {asset.name}".upper() | |
| return "IGROWTH" in text or ("ITRUST" in text and "GROWTH" in text) | |
| async def _igrowth_proxy_context(assets: list[OptimizerAsset], total_value: float) -> dict[str, Any] | None: | |
| igrowth_assets = [asset for asset in assets if _is_igrowth(asset)] | |
| if not igrowth_assets: | |
| return None | |
| stock_candidates: list[OptimizerAsset] = [] | |
| for stock in await Stock.all(): | |
| asset = await _build_stock_asset({"asset_id": stock.id, "quantity": Decimal("1")}) | |
| if asset is None: | |
| continue | |
| liquidity_score = _safe_float((asset.liquidity or {}).get("liquidity_score")) | |
| max_buy_value = _safe_float((asset.liquidity or {}).get("max_buy_value_20d")) | |
| if liquidity_score >= 35 and max_buy_value >= total_value * 0.02: | |
| stock_candidates.append(asset) | |
| if not stock_candidates: | |
| return { | |
| "method": "top-liquid-stock proxy", | |
| "available": False, | |
| "message": "IGROWTH holdings were not available and no liquid stock proxy basket passed the volume screen.", | |
| } | |
| top = sorted( | |
| stock_candidates, | |
| key=lambda asset: _asset_candidate_score(asset, _risk_settings("growth", None)), | |
| reverse=True, | |
| )[:5] | |
| weights = np.array([ | |
| max(_safe_float((asset.liquidity or {}).get("liquidity_score")), 1.0) | |
| for asset in top | |
| ]) | |
| weights = weights / weights.sum() | |
| proxy_return = float(np.dot(weights, np.array([asset.expected_return for asset in top]))) | |
| proxy_capacity = sum(_safe_float((asset.liquidity or {}).get("max_buy_value_20d")) for asset in top) | |
| notes = [] | |
| for asset in igrowth_assets: | |
| asset.expected_return = min(asset.expected_return + 0.01, 0.35) | |
| gap = proxy_return - asset.expected_return | |
| if gap > 0.05 and proxy_capacity >= asset.current_value * 0.25: | |
| penalty = min(0.04, gap * 0.25) | |
| asset.expected_return = max(asset.expected_return - penalty, 0.03) | |
| notes.append( | |
| f"{asset.symbol} underperformed the liquid-stock proxy, but keeps a liquidity/diversification credit." | |
| ) | |
| return { | |
| "method": "top-liquid-stock proxy", | |
| "available": True, | |
| "igrowth_symbols": [asset.symbol for asset in igrowth_assets], | |
| "proxy_expected_return": round(proxy_return, 4), | |
| "proxy_capacity_20d": round(proxy_capacity, 2), | |
| "proxy_assets": [ | |
| { | |
| "symbol": asset.symbol, | |
| "name": asset.name, | |
| "expected_return": round(asset.expected_return, 4), | |
| "volatility": round(asset.volatility, 4), | |
| "liquidity_score": round(_safe_float((asset.liquidity or {}).get("liquidity_score")), 2), | |
| "max_buy_value_20d": round(_safe_float((asset.liquidity or {}).get("max_buy_value_20d")), 2), | |
| } | |
| for asset in top | |
| ], | |
| "notes": notes | |
| or [ | |
| "IGROWTH is treated as a liquid/diversified wrapper and compared against a volume-screened top-stock proxy." | |
| ], | |
| } | |
| def _portfolio_stats(weights: np.ndarray, returns: np.ndarray, vols: np.ndarray) -> dict: | |
| portfolio_return = float(np.dot(weights, returns)) | |
| portfolio_vol = float(np.sqrt(np.dot(weights**2, vols**2))) | |
| sharpe = ( | |
| (portfolio_return - DEFAULT_RISK_FREE_RATE) / portfolio_vol | |
| if portfolio_vol > 0 | |
| else 0.0 | |
| ) | |
| return { | |
| "expected_return": portfolio_return, | |
| "volatility": portfolio_vol, | |
| "sharpe": sharpe, | |
| } | |
| def _class_totals(weights: np.ndarray, assets: list[OptimizerAsset]) -> dict[str, float]: | |
| totals = {"STOCK": 0.0, "FUND": 0.0, "BOND": 0.0} | |
| for weight, asset in zip(weights, assets): | |
| totals[asset.asset_type] = totals.get(asset.asset_type, 0.0) + float(weight) | |
| return totals | |
| def _respects_limits(weights: np.ndarray, assets: list[OptimizerAsset], settings: dict[str, Any]) -> bool: | |
| effective_max_asset_weight = max(float(settings["max_asset_weight"]), 1 / len(assets)) | |
| if np.any(weights < 0) or np.any(weights > effective_max_asset_weight): | |
| return False | |
| available_classes = {asset.asset_type for asset in assets} | |
| if len(available_classes) == 1: | |
| return True | |
| class_totals = _class_totals(weights, assets) | |
| class_limits = settings["class_limits"] | |
| for asset_type in available_classes: | |
| total = class_totals.get(asset_type, 0.0) | |
| min_weight, max_weight = class_limits[asset_type] | |
| if total < min_weight or total > max_weight: | |
| return False | |
| return True | |
| def _estimate_rebalance_fees( | |
| current_weights: np.ndarray, | |
| suggested_weights: np.ndarray, | |
| assets: list[OptimizerAsset], | |
| total_value: float, | |
| ) -> float: | |
| return sum(item["estimated_fee"] for item in _fee_breakdown(current_weights, suggested_weights, assets, total_value)) | |
| def _liquidity_trade_warnings( | |
| current_weights: np.ndarray, | |
| suggested_weights: np.ndarray, | |
| assets: list[OptimizerAsset], | |
| total_value: float, | |
| ) -> list[str]: | |
| warnings = [] | |
| for current, suggested, asset in zip(current_weights, suggested_weights, assets): | |
| if asset.asset_type != "STOCK" or suggested <= current: | |
| continue | |
| trade_amount = float(suggested - current) * total_value | |
| max_buy_value = _safe_float((asset.liquidity or {}).get("max_buy_value_20d")) | |
| if max_buy_value <= 0: | |
| warnings.append(f"{asset.symbol} has no recent turnover capacity, so new buys should be treated as speculative.") | |
| elif trade_amount > max_buy_value: | |
| warnings.append( | |
| f"{asset.symbol} buy is capped by observed volume: suggested demand exceeds about 10% of 20 trading days of average turnover." | |
| ) | |
| return warnings | |
| def _liquidity_capacity_penalty( | |
| current_weights: np.ndarray, | |
| suggested_weights: np.ndarray, | |
| assets: list[OptimizerAsset], | |
| total_value: float, | |
| ) -> float: | |
| penalty = 0.0 | |
| for current, suggested, asset in zip(current_weights, suggested_weights, assets): | |
| if suggested <= current: | |
| continue | |
| trade_amount = float(suggested - current) * total_value | |
| if asset.asset_type == "STOCK": | |
| max_buy_value = _safe_float((asset.liquidity or {}).get("max_buy_value_20d")) | |
| if max_buy_value <= 0: | |
| penalty += 0.25 | |
| continue | |
| if trade_amount > max_buy_value: | |
| penalty += min((trade_amount / max_buy_value - 1) * 0.08, 0.35) | |
| elif asset.asset_type == "FUND": | |
| terms = asset.liquidity or {} | |
| minimum = ( | |
| _safe_float(terms.get("min_initial")) | |
| if current <= 0 | |
| else _safe_float(terms.get("min_additional")) | |
| ) | |
| if minimum and trade_amount < minimum: | |
| penalty += 0.18 | |
| if not terms.get("prospectus_backed"): | |
| penalty += 0.08 | |
| if _safe_float(terms.get("lock_in_days")) > 0: | |
| penalty += min(_safe_float(terms.get("lock_in_days")) / 365 * 0.08, 0.08) | |
| return penalty | |
| def _apply_liquidity_caps( | |
| current_weights: np.ndarray, | |
| suggested_weights: np.ndarray, | |
| assets: list[OptimizerAsset], | |
| total_value: float, | |
| settings: dict[str, Any], | |
| ) -> tuple[np.ndarray, list[str]]: | |
| capped = suggested_weights.copy() | |
| warnings = [] | |
| excess = 0.0 | |
| for index, (current, suggested, asset) in enumerate(zip(current_weights, capped, assets)): | |
| if suggested <= current: | |
| continue | |
| if asset.asset_type == "STOCK": | |
| max_buy_value = _safe_float((asset.liquidity or {}).get("max_buy_value_20d")) | |
| if max_buy_value <= 0: | |
| max_weight = float(current) | |
| else: | |
| max_weight = float(current) + (max_buy_value / total_value if total_value else 0.0) | |
| if suggested > max_weight: | |
| excess += float(suggested - max_weight) | |
| capped[index] = max_weight | |
| warnings.append( | |
| f"{asset.symbol} increase was capped because DSE volume may not support the full trade size." | |
| ) | |
| elif asset.asset_type == "FUND": | |
| trade_amount = float(suggested - current) * total_value | |
| terms = asset.liquidity or {} | |
| minimum = ( | |
| _safe_float(terms.get("min_initial")) | |
| if current <= 0 | |
| else _safe_float(terms.get("min_additional")) | |
| ) | |
| if minimum and trade_amount < minimum: | |
| excess += float(capped[index] - current) | |
| capped[index] = 0.0 if current <= 0 else current | |
| warnings.append( | |
| f"{asset.symbol} buy was removed because it is below the prospectus minimum investment/addition." | |
| ) | |
| if excess <= 0: | |
| return capped, warnings | |
| effective_max_asset_weight = max(float(settings["max_asset_weight"]), 1 / len(assets)) | |
| receiver_order = {"FUND": 0, "BOND": 1, "STOCK": 2} | |
| def can_receive(index: int, asset: OptimizerAsset) -> bool: | |
| room = max(effective_max_asset_weight - float(capped[index]), 0.0) | |
| if room <= 0.0001: | |
| return False | |
| if asset.asset_type != "FUND": | |
| return True | |
| terms = asset.liquidity or {} | |
| minimum = ( | |
| _safe_float(terms.get("min_initial")) | |
| if capped[index] <= 0 | |
| else _safe_float(terms.get("min_additional")) | |
| ) | |
| return not minimum or room * total_value >= minimum | |
| receivers = sorted( | |
| [ | |
| (index, asset) | |
| for index, asset in enumerate(assets) | |
| if can_receive(index, asset) | |
| ], | |
| key=lambda item: (receiver_order.get(item[1].asset_type, 9), -item[1].expected_return), | |
| ) | |
| for index, _asset in receivers: | |
| if excess <= 0: | |
| break | |
| room = max(effective_max_asset_weight - float(capped[index]), 0.0) | |
| addition = min(room, excess) | |
| capped[index] += addition | |
| excess -= addition | |
| if capped.sum() > 0: | |
| capped = capped / capped.sum() | |
| return capped, warnings | |
| def _fee_breakdown( | |
| current_weights: np.ndarray, | |
| suggested_weights: np.ndarray, | |
| assets: list[OptimizerAsset], | |
| total_value: float, | |
| ) -> list[dict[str, Any]]: | |
| items = [] | |
| for current, suggested, asset in zip(current_weights, suggested_weights, assets): | |
| difference = float(suggested - current) | |
| trade_amount = abs(difference) * total_value | |
| if trade_amount <= 0: | |
| continue | |
| side = "BUY" if difference > 0 else "SELL" | |
| fee_rate = 0.0 | |
| fee_source = "No explicit transaction fee" | |
| fee_components: dict[str, float] = {} | |
| if asset.asset_type == "STOCK": | |
| band = _stock_fee_band(trade_amount) | |
| if band: | |
| fee_components = { | |
| "brokerage_commission": round(trade_amount * (_safe_float(band.get("brokerage_commission")) / 100), 2), | |
| "transaction_fee_cmsa": round(trade_amount * (_safe_float(band.get("transaction_fee_cmsa")) / 100), 2), | |
| "transaction_fee_dse": round(trade_amount * (_safe_float(band.get("transaction_fee_dse")) / 100), 2), | |
| "fidelity_fee": round(trade_amount * (_safe_float(band.get("fidelity_fee")) / 100), 2), | |
| "csd_fee": round(trade_amount * (_safe_float(band.get("csd_fee")) / 100), 2), | |
| } | |
| fee_rate = _safe_float(band.get("total_cost_to_investor")) / 100 | |
| fee_source = f"DSE equity fee band: {band.get('label')}" | |
| else: | |
| fee_rate = asset.fee_rate | |
| fee_source = "DSE equity fee fallback" | |
| elif asset.asset_type == "FUND" and side == "BUY": | |
| fee_rate = asset.fee_rate | |
| fee_source = "Fund entry load" | |
| elif asset.asset_type == "FUND" and side == "SELL": | |
| fee_rate = _safe_float((asset.liquidity or {}).get("exit_fee_rate")) | |
| fee_source = "Fund exit load from prospectus metadata" | |
| if fee_rate <= 0: | |
| continue | |
| fee = trade_amount * fee_rate | |
| items.append( | |
| { | |
| "asset_id": asset.asset_id, | |
| "asset_type": asset.asset_type, | |
| "symbol": asset.symbol, | |
| "name": asset.name, | |
| "side": side, | |
| "trade_amount": round(trade_amount, 2), | |
| "buy_amount": round(trade_amount if side == "BUY" else 0.0, 2), | |
| "sell_amount": round(trade_amount if side == "SELL" else 0.0, 2), | |
| "fee_rate": round(fee_rate, 6), | |
| "estimated_fee": round(fee, 2), | |
| "fee_components": fee_components, | |
| "fee_source": fee_source, | |
| } | |
| ) | |
| return sorted(items, key=lambda item: item["estimated_fee"], reverse=True) | |
| def _fee_summary(fee_items: list[dict[str, Any]]) -> dict[str, Any]: | |
| by_type: dict[str, float] = {} | |
| for item in fee_items: | |
| by_type[item["asset_type"]] = by_type.get(item["asset_type"], 0.0) + item["estimated_fee"] | |
| total = sum(by_type.values()) | |
| return { | |
| "total": round(total, 2), | |
| "by_asset_type": {key: round(value, 2) for key, value in by_type.items()}, | |
| "items": fee_items, | |
| } | |
| def _advisor_comment( | |
| current_stats: dict[str, float], | |
| suggested_stats: dict[str, float], | |
| allocations: list[dict[str, Any]], | |
| estimated_fees: float, | |
| risk_label: str, | |
| ) -> dict[str, Any]: | |
| increases = [item for item in allocations if item["difference"] > 0.03] | |
| reductions = [item for item in allocations if item["difference"] < -0.03] | |
| top_increase = increases[0] if increases else None | |
| new_buys = [item for item in increases if item.get("is_new_candidate")] | |
| top_reduction = sorted(reductions, key=lambda item: item["difference"])[0] if reductions else None | |
| sharpe_delta = suggested_stats["sharpe"] - current_stats["sharpe"] | |
| return_delta = suggested_stats["expected_return"] - current_stats["expected_return"] | |
| summary = ( | |
| f"The suggested allocation is tuned for a {risk_label.lower()} risk setting while trying " | |
| "to avoid one position carrying too much of the risk." | |
| ) | |
| if sharpe_delta > 0.05: | |
| summary = ( | |
| "The suggested allocation looks stronger on a risk-adjusted basis: it " | |
| "improves the expected Sharpe score while keeping the portfolio growth-focused." | |
| ) | |
| elif return_delta > 0: | |
| summary = ( | |
| "The suggested allocation mainly improves expected growth, but the risk-adjusted " | |
| "improvement is modest, so rebalance gradually rather than all at once." | |
| ) | |
| actions = [] | |
| if top_increase: | |
| if top_increase.get("is_new_candidate"): | |
| actions.append( | |
| f"Buy {top_increase['symbol']} toward {top_increase['suggested_weight'] * 100:.1f}% because it passed the return, valuation, fee, and liquidity screens." | |
| ) | |
| else: | |
| actions.append( | |
| f"Increase {top_increase['symbol']} toward {top_increase['suggested_weight'] * 100:.1f}% because its growth return still looks attractive after estimated fees." | |
| ) | |
| if new_buys and (not top_increase or not top_increase.get("is_new_candidate")): | |
| actions.append( | |
| "Add selected new stock/fund candidates only where observed trading volume can support the buy size." | |
| ) | |
| if top_reduction: | |
| actions.append( | |
| f"Reduce {top_reduction['symbol']} toward {top_reduction['suggested_weight'] * 100:.1f}% to lower concentration or weaker risk-adjusted exposure." | |
| ) | |
| if estimated_fees > 0: | |
| actions.append( | |
| f"Estimated rebalance fees are about TZS {estimated_fees:,.0f}, so the changes should be made only if the expected improvement is worth that cost." | |
| ) | |
| if not actions: | |
| actions.append("No major rebalance is needed; current weights are close to the suggested growth allocation.") | |
| return { | |
| "title": "Growth allocation view", | |
| "summary": summary, | |
| "key_actions": actions[:3], | |
| "caution": ( | |
| "This is an allocation model, not a guarantee. It uses historical prices, NAVs, " | |
| "dividends, fund payout flags, bond coupons, cached fundamentals, observed stock volume, and known transaction costs; financial statement quality and future news still need human review." | |
| ), | |
| } | |
| def _bond_execution_note(asset: OptimizerAsset) -> str: | |
| liquidity = asset.liquidity or {} | |
| secondary = liquidity.get("comparable_secondary_market") or [] | |
| primary = liquidity.get("comparable_primary_market") or [] | |
| parts = [] | |
| if liquidity.get("same_bond_secondary_available"): | |
| parts.append( | |
| f"same bond last traded {liquidity.get('latest_trade_date')} at yield " | |
| f"{_safe_float(liquidity.get('latest_yield_percent')):.2f}%" | |
| ) | |
| elif secondary: | |
| median_yield = liquidity.get("comparable_median_yield_percent") | |
| parts.append( | |
| f"no same-bond print; comparable secondary prints show median yield " | |
| f"{_safe_float(median_yield):.2f}%" | |
| ) | |
| else: | |
| parts.append("no fresh same-bond or comparable secondary print is stored") | |
| if primary: | |
| top = primary[0] | |
| parts.append( | |
| f"recent primary auction reference {top.get('bond_no')} " | |
| f"from {top.get('auction_date')} " | |
| f"({top.get('maturity_years')}y) approximate reinvestment yield " | |
| f"{_safe_float(top.get('implied_yield_percent')):.2f}% at price " | |
| f"{_safe_float(top.get('price_per_100')):.2f}" | |
| ) | |
| else: | |
| parts.append("no recent comparable primary auction reference is stored") | |
| caveat = liquidity.get("execution_caveat") | |
| if caveat: | |
| parts.append(str(caveat)) | |
| return "Secondary/primary market check: " + "; ".join(parts) | |
| def _bond_return_worth_note(asset: OptimizerAsset) -> str: | |
| liquidity = asset.liquidity or {} | |
| net_return = asset.expected_return - asset.fee_rate | |
| hurdle = DEFAULT_RISK_FREE_RATE | |
| spread = net_return - hurdle | |
| price_percent = _safe_float(liquidity.get("price_percent"), asset.current_price * 100) | |
| latest_yield = _safe_float(liquidity.get("latest_yield_percent"), net_return * 100) | |
| income_yield = asset.income_yield * 100 | |
| verdict = "adequate" if spread >= 0.015 else "thin" if spread >= 0 else "weak" | |
| premium_note = ( | |
| f" premium price {price_percent:.2f}% means coupon income is not the same as total return;" | |
| if price_percent > 110 | |
| else "" | |
| ) | |
| return ( | |
| f"Return worth check: {verdict}; market yield {latest_yield:.2f}% " | |
| f"vs {hurdle * 100:.2f}% hurdle ({spread * 100:+.2f}%), " | |
| f"income yield on invested price about {income_yield:.2f}%.{premium_note}" | |
| ) | |
| def _bond_coupon_advantage(asset: OptimizerAsset) -> float: | |
| liquidity = asset.liquidity or {} | |
| coupon = _safe_float(liquidity.get("coupon_rate_percent")) / 100 | |
| reinvestment_yield = _safe_float(liquidity.get("primary_reinvestment_yield_percent")) / 100 | |
| if coupon <= 0 or reinvestment_yield <= 0: | |
| return 0.0 | |
| return coupon - reinvestment_yield | |
| def _is_irreplaceable_income_bond(asset: OptimizerAsset) -> bool: | |
| return ( | |
| asset.asset_type == "BOND" | |
| and _bond_coupon_advantage(asset) >= HIGH_COUPON_ADVANTAGE_BPS / 10_000 | |
| ) | |
| def _has_rock_solid_fundamentals(asset: OptimizerAsset) -> bool: | |
| fundamentals = asset.fundamentals or {} | |
| if asset.asset_type != "STOCK": | |
| return False | |
| pe_ratio = _safe_float(fundamentals.get("pe_ratio")) | |
| pb_ratio = _safe_float(fundamentals.get("pb_ratio")) | |
| eps = _safe_float(fundamentals.get("eps")) | |
| debt_to_equity = _safe_float(fundamentals.get("debt_to_equity"), default=-1) | |
| return ( | |
| eps > 0 | |
| and 0 < pe_ratio <= 22 | |
| and 0 < pb_ratio <= 3.5 | |
| and 0 <= debt_to_equity <= 150 | |
| ) | |
| def _has_visible_macro_tailwind(asset: OptimizerAsset) -> bool: | |
| data = {**(asset.fundamentals or {}), **(asset.liquidity or {})} | |
| return bool( | |
| data.get("macro_tailwind") | |
| or data.get("sector_tailwind") | |
| or data.get("rate_tailwind") | |
| or str(data.get("macro_context") or "").lower() in {"tailwind", "supportive", "positive"} | |
| ) | |
| def _clean_exit_liquidity(asset: OptimizerAsset, trade_value: float) -> bool: | |
| liquidity = asset.liquidity or {} | |
| if asset.asset_type == "STOCK": | |
| max_buy_value = _safe_float(liquidity.get("max_buy_value_20d")) | |
| return max_buy_value > 0 and trade_value <= max_buy_value | |
| if asset.asset_type == "FUND": | |
| redemption_days = _safe_float(liquidity.get("redemption_days"), default=999) | |
| return redemption_days <= 7 and bool(liquidity.get("prospectus_backed")) | |
| return False | |
| def _replacement_quality_for_irreplaceable_bond( | |
| bond: OptimizerAsset, | |
| assets: list[OptimizerAsset], | |
| suggested_weights: np.ndarray, | |
| current_weights: np.ndarray, | |
| total_value: float, | |
| ) -> tuple[bool, list[str]]: | |
| coupon_advantage = _bond_coupon_advantage(bond) | |
| required_excess = coupon_advantage + LEGACY_BOND_REPLACEMENT_MARGIN | |
| increased_assets = [ | |
| (asset, float(suggested - current) * total_value) | |
| for asset, suggested, current in zip(assets, suggested_weights, current_weights) | |
| if asset.key != bond.key and suggested > current + 0.01 | |
| ] | |
| failures: list[str] = [] | |
| for asset, trade_value in increased_assets: | |
| net_return_advantage = asset.expected_return - asset.fee_rate - bond.income_yield | |
| gates = { | |
| "return_margin": net_return_advantage >= required_excess, | |
| "low_moderate_volatility": asset.volatility <= LOW_MODERATE_VOLATILITY_MAX, | |
| "rock_solid_fundamentals": _has_rock_solid_fundamentals(asset), | |
| "macro_tailwind": _has_visible_macro_tailwind(asset), | |
| "history_and_liquidity": ( | |
| _safe_float((asset.liquidity or {}).get("history_years")) >= MIN_REPLACEMENT_HISTORY_YEARS | |
| and _clean_exit_liquidity(asset, trade_value) | |
| ), | |
| } | |
| if all(gates.values()): | |
| return True, [] | |
| failed = ", ".join(name for name, passed in gates.items() if not passed) | |
| failures.append(f"{asset.symbol} failed replacement gates: {failed}") | |
| if not failures: | |
| failures.append("No increased replacement asset was available for the income stream.") | |
| return False, failures | |
| def _protect_irreplaceable_bonds( | |
| current_weights: np.ndarray, | |
| suggested_weights: np.ndarray, | |
| assets: list[OptimizerAsset], | |
| total_value: float, | |
| ) -> tuple[np.ndarray, list[str]]: | |
| protected = suggested_weights.copy() | |
| warnings: list[str] = [] | |
| for index, asset in enumerate(assets): | |
| if not _is_irreplaceable_income_bond(asset) or protected[index] >= current_weights[index]: | |
| continue | |
| qualified, failures = _replacement_quality_for_irreplaceable_bond( | |
| asset, | |
| assets, | |
| protected, | |
| current_weights, | |
| total_value, | |
| ) | |
| if qualified: | |
| continue | |
| restored = float(current_weights[index] - protected[index]) | |
| protected[index] = current_weights[index] | |
| warnings.append( | |
| f"{asset.symbol} reduction blocked: coupon materially exceeds recent primary reinvestment yield, so default is hold to maturity unless all replacement gates pass. " | |
| + " ".join(failures[:2]) | |
| ) | |
| receiver_indices = [ | |
| i | |
| for i, (current, suggested) in enumerate(zip(current_weights, protected)) | |
| if i != index and suggested > current | |
| ] | |
| receiver_total = sum(float(protected[i] - current_weights[i]) for i in receiver_indices) | |
| if receiver_total > 0: | |
| for i in receiver_indices: | |
| reduction = restored * float(protected[i] - current_weights[i]) / receiver_total | |
| protected[i] = max(current_weights[i], protected[i] - reduction) | |
| total = float(protected.sum()) | |
| if total > 0: | |
| protected = protected / total | |
| return protected, warnings | |
| def _optimize_sync( | |
| assets_payload: list[dict[str, Any]], | |
| simulations: int, | |
| risk_profile: str | None, | |
| risk_score: int | None, | |
| alternatives: list[dict[str, Any]], | |
| igrowth_context: dict[str, Any] | None = None, | |
| ) -> dict[str, Any]: | |
| assets = [OptimizerAsset(**payload) for payload in assets_payload] | |
| settings = _risk_settings(risk_profile, risk_score) | |
| total_value = sum(asset.current_value for asset in assets) | |
| current_weights = np.array([asset.current_value / total_value for asset in assets]) | |
| returns = np.array([asset.expected_return for asset in assets]) | |
| vols = np.array([asset.volatility for asset in assets]) | |
| income_yields = np.array([asset.income_yield for asset in assets]) | |
| best_weights = current_weights.copy() | |
| best_score = -float("inf") | |
| rng = np.random.default_rng(42) | |
| attempts = max(simulations, 1000) | |
| for _ in range(attempts): | |
| weights = rng.dirichlet(np.ones(len(assets))) | |
| if not _respects_limits(weights, assets, settings): | |
| continue | |
| stats = _portfolio_stats(weights, returns, vols) | |
| fees = _estimate_rebalance_fees(current_weights, weights, assets, total_value) | |
| fee_drag = fees / total_value if total_value else 0.0 | |
| income_score = float(np.dot(weights, income_yields)) | |
| liquidity_penalty = _liquidity_capacity_penalty(current_weights, weights, assets, total_value) | |
| score = ( | |
| stats["sharpe"] | |
| + settings["return_weight"] * stats["expected_return"] | |
| + settings["income_weight"] * income_score | |
| - settings["volatility_penalty"] * stats["volatility"] | |
| - fee_drag | |
| - liquidity_penalty | |
| ) | |
| if score > best_score: | |
| best_score = score | |
| best_weights = weights | |
| best_weights, liquidity_warnings = _apply_liquidity_caps( | |
| current_weights, | |
| best_weights, | |
| assets, | |
| total_value, | |
| settings, | |
| ) | |
| best_weights, bond_protection_warnings = _protect_irreplaceable_bonds( | |
| current_weights, | |
| best_weights, | |
| assets, | |
| total_value, | |
| ) | |
| liquidity_warnings.extend(bond_protection_warnings) | |
| liquidity_warnings.extend( | |
| _liquidity_trade_warnings(current_weights, best_weights, assets, total_value) | |
| ) | |
| current_stats = _portfolio_stats(current_weights, returns, vols) | |
| suggested_stats = _portfolio_stats(best_weights, returns, vols) | |
| estimated_fees = _estimate_rebalance_fees( | |
| current_weights, best_weights, assets, total_value | |
| ) | |
| fee_items = _fee_breakdown(current_weights, best_weights, assets, total_value) | |
| fee_summary = _fee_summary(fee_items) | |
| allocations = [] | |
| for asset, current_weight, suggested_weight in zip( | |
| assets, current_weights, best_weights | |
| ): | |
| difference = float(suggested_weight - current_weight) | |
| reason = "Hold allocation" | |
| if difference > 0.03: | |
| reason = ( | |
| "Buy new candidate after valuation, liquidity, fees, and prospectus checks" | |
| if asset.is_candidate | |
| else "Increase for growth-adjusted return after fees" | |
| ) | |
| if asset.asset_type == "BOND": | |
| reason = f"{reason}. {_bond_return_worth_note(asset)} {_bond_execution_note(asset)}" | |
| elif difference < -0.03: | |
| reason = "Reduce concentration or weaker risk-adjusted return" | |
| if asset.asset_type == "BOND": | |
| reason = f"{reason}. {_bond_return_worth_note(asset)} {_bond_execution_note(asset)}" | |
| elif asset.asset_type == "BOND": | |
| if _is_irreplaceable_income_bond(asset): | |
| reason = ( | |
| "Hold to maturity by default: coupon materially exceeds recent primary reinvestment yields, " | |
| "and no replacement passed all return, volatility, fundamentals, macro, history, and liquidity gates" | |
| ) | |
| reason = f"{reason}. {_bond_return_worth_note(asset)} {_bond_execution_note(asset)}" | |
| allocations.append( | |
| { | |
| "asset_id": asset.asset_id, | |
| "asset_type": asset.asset_type, | |
| "symbol": asset.symbol, | |
| "name": asset.name, | |
| "current_weight": round(float(current_weight), 4), | |
| "suggested_weight": round(float(suggested_weight), 4), | |
| "difference": round(difference, 4), | |
| "current_value": round(asset.current_value, 2), | |
| "suggested_value": round(float(suggested_weight) * total_value, 2), | |
| "rebalance_amount": round(difference * total_value, 2), | |
| "expected_return": round(asset.expected_return, 4), | |
| "volatility": round(asset.volatility, 4), | |
| "income_yield": round(asset.income_yield, 4), | |
| "fee_rate": round(asset.fee_rate, 4), | |
| "is_new_candidate": asset.is_candidate, | |
| "fundamentals": asset.fundamentals, | |
| "liquidity": asset.liquidity, | |
| "reason": reason, | |
| } | |
| ) | |
| sorted_allocations = sorted( | |
| allocations, | |
| key=lambda item: item["suggested_weight"], | |
| reverse=True, | |
| ) | |
| return { | |
| "objective": "growth", | |
| "risk_profile": settings["key"], | |
| "risk_label": settings["label"], | |
| "risk_score": settings["risk_score"], | |
| "total_value": round(total_value, 2), | |
| "risk_free_rate": DEFAULT_RISK_FREE_RATE, | |
| "constraints": { | |
| "max_asset_weight": max(float(settings["max_asset_weight"]), 1 / len(assets)), | |
| "class_limits": settings["class_limits"], | |
| "class_limits_apply_only_when_multiple_asset_classes_exist": True, | |
| }, | |
| "current": {k: round(v, 4) for k, v in current_stats.items()}, | |
| "suggested": {k: round(v, 4) for k, v in suggested_stats.items()}, | |
| "estimated_rebalance_fees": round(estimated_fees, 2), | |
| "fee_breakdown": fee_summary, | |
| "liquidity_warnings": liquidity_warnings, | |
| "igrowth_proxy_comparison": igrowth_context, | |
| "advisor_comment": _advisor_comment( | |
| current_stats, | |
| suggested_stats, | |
| sorted_allocations, | |
| estimated_fees, | |
| settings["label"], | |
| ), | |
| "allocations": sorted_allocations, | |
| "alternatives": alternatives, | |
| "notes": [ | |
| "Suggested allocations can include new stock and fund candidates when they pass valuation, liquidity, fee, and risk checks.", | |
| "Stock buy fee estimates use the stored DSE transaction fee bands and stock increases are capped by observed DSE turnover capacity.", | |
| "Stock dividends, fund income flags, bond coupons, and fund entry loads are included where available.", | |
| "Stock fundamentals use cached Simply Wall St snapshots when available; missing EPS, PE, or PB values are not guessed.", | |
| "Optimization uses Monte Carlo simulation to stay lightweight on Hugging Face.", | |
| ], | |
| } | |
| async def analyze_portfolio_growth_allocation( | |
| portfolio_id: int, | |
| simulations: int = DEFAULT_SIMULATIONS, | |
| risk_profile: str | None = None, | |
| risk_score: int | None = None, | |
| ) -> dict[str, Any]: | |
| current_assets = await _build_assets(portfolio_id) | |
| if not current_assets: | |
| raise ValueError("At least one priced asset is required for allocation analysis") | |
| alternatives = await _suggest_alternatives(current_assets, risk_profile, risk_score) | |
| settings = _risk_settings(risk_profile, risk_score) | |
| total_value = sum(asset.current_value for asset in current_assets) | |
| igrowth_context = await _igrowth_proxy_context(current_assets, total_value) | |
| candidate_assets = await _build_new_candidate_assets(current_assets, risk_profile, risk_score) | |
| assets = current_assets + candidate_assets | |
| if len(assets) == 1: | |
| asset = assets[0] | |
| current_stats = { | |
| "expected_return": asset.expected_return, | |
| "volatility": asset.volatility, | |
| "sharpe": ( | |
| (asset.expected_return - DEFAULT_RISK_FREE_RATE) / asset.volatility | |
| if asset.volatility > 0 | |
| else 0.0 | |
| ), | |
| } | |
| allocation = { | |
| "asset_id": asset.asset_id, | |
| "asset_type": asset.asset_type, | |
| "symbol": asset.symbol, | |
| "name": asset.name, | |
| "current_weight": 1.0, | |
| "suggested_weight": 1.0, | |
| "difference": 0.0, | |
| "current_value": round(asset.current_value, 2), | |
| "suggested_value": round(asset.current_value, 2), | |
| "rebalance_amount": 0.0, | |
| "expected_return": round(asset.expected_return, 4), | |
| "volatility": round(asset.volatility, 4), | |
| "fee_rate": round(asset.fee_rate, 4), | |
| "reason": "Only priced asset in portfolio; add other assets before optimization can rebalance.", | |
| } | |
| return { | |
| "objective": "growth", | |
| "risk_profile": settings["key"], | |
| "risk_label": settings["label"], | |
| "risk_score": settings["risk_score"], | |
| "total_value": round(asset.current_value, 2), | |
| "risk_free_rate": DEFAULT_RISK_FREE_RATE, | |
| "constraints": { | |
| "max_asset_weight": 1.0, | |
| "class_limits": GROWTH_CLASS_LIMITS, | |
| "class_limits_apply_only_when_multiple_asset_classes_exist": True, | |
| }, | |
| "current": {k: round(v, 4) for k, v in current_stats.items()}, | |
| "suggested": {k: round(v, 4) for k, v in current_stats.items()}, | |
| "estimated_rebalance_fees": 0.0, | |
| "fee_breakdown": _fee_summary([]), | |
| "advisor_comment": { | |
| "title": "Concentration warning", | |
| "summary": ( | |
| f"Your portfolio is currently 100% allocated to {asset.symbol}. " | |
| "That may have worked well, but the growth engine is concentrated in one security, " | |
| "so the next improvement is diversification rather than reweighting." | |
| ), | |
| "key_actions": [ | |
| "Add at least one more priced asset before running a full optimizer.", | |
| "For a growth portfolio, consider blending stocks with a liquid fund, ETF, or bond exposure to reduce single-name risk.", | |
| "Keep fees in mind: adding a new DSE stock position has transaction costs, so diversify in meaningful ticket sizes.", | |
| ], | |
| "caution": ( | |
| "The model cannot calculate an optimized allocation from one asset alone. " | |
| "It can only flag concentration risk and explain what data is missing." | |
| ), | |
| }, | |
| "allocations": [allocation], | |
| "alternatives": alternatives, | |
| "notes": [ | |
| "Single-asset portfolios receive a concentration analysis instead of a failed optimization.", | |
| "Add at least two priced assets to enable portfolio rebalancing advice.", | |
| ], | |
| } | |
| return await asyncio.to_thread( | |
| _optimize_sync, | |
| [asset.__dict__ for asset in assets], | |
| simulations, | |
| risk_profile, | |
| risk_score, | |
| alternatives, | |
| igrowth_context, | |
| ) | |
| async def run_portfolio_analysis_task( | |
| task_id: int, | |
| portfolio_id: int, | |
| simulations: int = DEFAULT_SIMULATIONS, | |
| risk_profile: str | None = None, | |
| risk_score: int | None = None, | |
| ) -> None: | |
| await ImportTask.filter(id=task_id).update( | |
| status=TaskStatus.RUNNING, | |
| details={ | |
| "portfolio_id": portfolio_id, | |
| "risk_profile": _risk_settings(risk_profile, risk_score)["key"], | |
| "risk_score": _risk_settings(risk_profile, risk_score)["risk_score"], | |
| "status_message": "Portfolio growth allocation analysis is running", | |
| }, | |
| ) | |
| try: | |
| result = await analyze_portfolio_growth_allocation(portfolio_id, simulations, risk_profile, risk_score) | |
| except Exception as exc: | |
| await ImportTask.filter(id=task_id).update( | |
| status=TaskStatus.FAILED, | |
| details={ | |
| "portfolio_id": portfolio_id, | |
| "error": str(exc), | |
| }, | |
| ) | |
| return | |
| await ImportTask.filter(id=task_id).update( | |
| status=TaskStatus.COMPLETED, | |
| details={ | |
| "portfolio_id": portfolio_id, | |
| "result": result, | |
| }, | |
| ) | |