InvestingTest / App /analysis /portfolio_optimizer.py
Mbonea's picture
Protect high coupon bond income streams
3adca8d
import asyncio
from dataclasses import dataclass
from dataclasses import replace
from datetime import date, timedelta
from decimal import Decimal, InvalidOperation
import re
from typing import Any
import numpy as np
from App.analysis.volatility import calculate_annualized_volatility
from App.routers.bonds.models import Bond
from App.routers.economy.valuation import get_bond_pricing_context
from App.routers.funds.models import FundPerformance, MutualFund
from App.routers.stocks.metrics import calculate_metrics, calculate_liquidity_metrics
from App.routers.stocks.models import Stock, StockPriceData
from App.routers.stocks.seed import load_dse_transaction_fee_seed_data
from App.routers.tasks.models import ImportTask, TaskStatus
GROWTH_CLASS_LIMITS = {
"STOCK": (0.30, 0.70),
"FUND": (0.10, 0.45),
"BOND": (0.05, 0.35),
}
MAX_ASSET_WEIGHT = 0.35
RISK_PROFILES = {
"conservative": {
"label": "Conservative",
"class_limits": {"STOCK": (0.00, 0.35), "FUND": (0.25, 0.70), "BOND": (0.15, 0.60)},
"max_asset_weight": 0.25,
"return_weight": 0.55,
"volatility_penalty": 1.35,
"income_weight": 0.45,
},
"balanced": {
"label": "Balanced",
"class_limits": GROWTH_CLASS_LIMITS,
"max_asset_weight": MAX_ASSET_WEIGHT,
"return_weight": 0.85,
"volatility_penalty": 0.85,
"income_weight": 0.25,
},
"growth": {
"label": "Growth",
"class_limits": {"STOCK": (0.30, 0.75), "FUND": (0.05, 0.45), "BOND": (0.00, 0.25)},
"max_asset_weight": 0.40,
"return_weight": 1.15,
"volatility_penalty": 0.45,
"income_weight": 0.10,
},
}
DEFAULT_RISK_FREE_RATE = 0.12
DEFAULT_SIMULATIONS = 6000
HIGH_COUPON_ADVANTAGE_BPS = 150
LEGACY_BOND_REPLACEMENT_MARGIN = 0.06
LOW_MODERATE_VOLATILITY_MAX = 0.16
MIN_REPLACEMENT_HISTORY_YEARS = 3.0
@dataclass
class OptimizerAsset:
key: str
asset_id: int
asset_type: str
symbol: str
name: str
quantity: float
current_price: float
current_value: float
expected_return: float
volatility: float
income_yield: float
fee_rate: float
fundamentals: dict[str, Any] | None = None
liquidity: dict[str, Any] | None = None
is_candidate: bool = False
def _blend_pair(start: tuple[float, float], end: tuple[float, float], t: float) -> tuple[float, float]:
return (
start[0] + (end[0] - start[0]) * t,
start[1] + (end[1] - start[1]) * t,
)
def _risk_settings(risk_profile: str | None, risk_score: int | None = None) -> dict[str, Any]:
if risk_score is not None:
score = max(0, min(100, int(risk_score)))
t = score / 100
low = RISK_PROFILES["conservative"]
high = RISK_PROFILES["growth"]
if score < 34:
label = "Conservative"
elif score > 66:
label = "Aggressive"
else:
label = "Balanced"
return {
"key": "conservative" if score < 34 else "growth" if score > 66 else "balanced",
"label": label,
"risk_score": score,
"class_limits": {
asset_type: _blend_pair(low["class_limits"][asset_type], high["class_limits"][asset_type], t)
for asset_type in GROWTH_CLASS_LIMITS
},
"max_asset_weight": low["max_asset_weight"] + (high["max_asset_weight"] - low["max_asset_weight"]) * t,
"return_weight": low["return_weight"] + (high["return_weight"] - low["return_weight"]) * t,
"volatility_penalty": low["volatility_penalty"] + (high["volatility_penalty"] - low["volatility_penalty"]) * t,
"income_weight": low["income_weight"] + (high["income_weight"] - low["income_weight"]) * t,
}
key = str(risk_profile or "balanced").strip().lower()
if key in {"low", "safe", "safer", "defensive"}:
key = "conservative"
elif key in {"high", "aggressive"}:
key = "growth"
settings = RISK_PROFILES.get(key, RISK_PROFILES["balanced"])
fallback_score = 15 if key == "conservative" else 85 if key == "growth" else 50
return {"key": key if key in RISK_PROFILES else "balanced", "risk_score": fallback_score, **settings}
def _safe_float(value: Any, default: float = 0.0) -> float:
if value in (None, ""):
return default
try:
return float(value)
except (TypeError, ValueError, InvalidOperation):
return default
def _annualized_return(values: list[tuple[date, float]]) -> float | None:
ordered = [(d, v) for d, v in sorted(values, key=lambda item: item[0]) if v > 0]
if len(ordered) < 2:
return None
start_date, start_value = ordered[0]
end_date, end_value = ordered[-1]
days = max((end_date - start_date).days, 1)
total_return = (end_value / start_value) - 1
return (1 + total_return) ** (365 / days) - 1
def _history_years(values: list[tuple[date, float]]) -> float:
ordered = [item for item in sorted(values, key=lambda item: item[0]) if item[1] > 0]
if len(ordered) < 2:
return 0.0
return max((ordered[-1][0] - ordered[0][0]).days / 365.25, 0.0)
def _stock_fee_band(consideration: float) -> dict[str, Any] | None:
payload = load_dse_transaction_fee_seed_data()
for band in payload.get("bands", []):
min_value = _safe_float(band.get("min_consideration"))
max_raw = band.get("max_consideration")
max_value = float("inf") if max_raw is None else _safe_float(max_raw)
if consideration > min_value and consideration <= max_value:
return band
return None
def _stock_fee_rate(consideration: float) -> float:
band = _stock_fee_band(consideration)
if band:
return _safe_float(band.get("total_cost_to_investor")) / 100
return 0.0206
def _parse_percent(raw_value: Any) -> float:
if raw_value is None:
return 0.0
text = str(raw_value)
digits = "".join(ch for ch in text if ch.isdigit() or ch == ".")
return _safe_float(digits) / 100 if digits else 0.0
def _parse_distribution_rate(raw_value: Any) -> float:
text = str(raw_value or "")
matches = re.findall(r"(\d+(?:\.\d+)?)\s*%", text)
if not matches:
return 0.0
return max(_safe_float(match) / 100 for match in matches)
def _parse_money_value(raw_value: Any) -> float | None:
if raw_value in (None, ""):
return None
digits = "".join(ch for ch in str(raw_value) if ch.isdigit() or ch == ".")
return _safe_float(digits) if digits else None
def _parse_fee_rate(raw_value: Any) -> float:
text = str(raw_value or "").lower()
if not text or "none" in text or "nil" in text:
return 0.0
match = re.search(r"(\d+(?:\.\d+)?)\s*%", text)
return _safe_float(match.group(1)) / 100 if match else 0.0
async def _fund_prospectus_terms(fund: MutualFund) -> dict[str, Any]:
try:
info = await fund.info_record
except Exception:
info = None
raw_data = info.raw_data if info and isinstance(info.raw_data, dict) else {}
liquidity = raw_data.get("liquidity") if isinstance(raw_data.get("liquidity"), dict) else {}
contributions = raw_data.get("contributions") if isinstance(raw_data.get("contributions"), dict) else {}
documents = raw_data.get("documents") if isinstance(raw_data.get("documents"), list) else []
min_initial = (
_parse_money_value(raw_data.get("min_initial"))
or _parse_money_value(fund.min_initial)
or _parse_money_value(contributions.get("min_initial"))
)
min_additional = (
_parse_money_value(raw_data.get("min_additional"))
or _parse_money_value(fund.min_additional)
or _parse_money_value(contributions.get("min_additional"))
)
min_redemption = (
_parse_money_value(raw_data.get("min_redemption"))
or _parse_money_value(liquidity.get("min_redemption"))
)
lock_in_days = raw_data.get("lock_in_days") or liquidity.get("lock_in_days")
redemption_days = fund.redemption_days or raw_data.get("redemption_days") or liquidity.get("redemption_days")
exit_fee_rate = _parse_fee_rate(fund.exit_load or raw_data.get("exit_load") or liquidity.get("exit_fee"))
entry_fee_rate = _safe_float(fund.entry_load) / 100
return {
"prospectus_backed": bool(raw_data or documents),
"prospectus_documents": [
item.get("path") or item.get("name")
for item in documents
if isinstance(item, dict) and (item.get("path") or item.get("name"))
],
"min_initial": min_initial,
"min_additional": min_additional,
"min_redemption": min_redemption,
"lock_in_days": int(lock_in_days) if str(lock_in_days or "").isdigit() else None,
"redemption_days": int(redemption_days) if str(redemption_days or "").isdigit() else redemption_days,
"entry_fee_rate": entry_fee_rate,
"exit_fee_rate": exit_fee_rate,
"exit_load": fund.exit_load,
"fund_type": fund.fund_type,
"currency": fund.currency,
}
def _fundamental_adjusted_stock_return(
base_return: float,
dividend_yield: float,
fundamentals: dict[str, Any],
liquidity: dict[str, Any],
) -> float:
pe_ratio = _safe_float(fundamentals.get("pe_ratio"))
pb_ratio = _safe_float(fundamentals.get("pb_ratio"))
eps = fundamentals.get("eps")
payout_ratio = _safe_float(fundamentals.get("payout_ratio"))
debt_to_equity = _safe_float(fundamentals.get("debt_to_equity"))
liquidity_score = _safe_float(liquidity.get("liquidity_score"))
adjusted = base_return
if pe_ratio > 0:
earnings_yield = 1 / pe_ratio
adjusted = (adjusted * 0.75) + ((earnings_yield + dividend_yield) * 0.25)
if pe_ratio > 30:
adjusted -= 0.035
elif pe_ratio > 22:
adjusted -= 0.015
elif pe_ratio < 10:
adjusted += 0.01
elif eps not in (None, ""):
adjusted -= 0.07
else:
adjusted -= 0.015
if pb_ratio > 0:
if pb_ratio > 5:
adjusted -= 0.05
elif pb_ratio > 3:
adjusted -= 0.025
elif pb_ratio <= 2:
adjusted += 0.01
if payout_ratio > 100:
adjusted -= 0.025
if debt_to_equity > 150:
adjusted -= 0.015
if liquidity_score < 25:
adjusted -= 0.05
elif liquidity_score < 45:
adjusted -= 0.025
elif liquidity_score > 75:
adjusted += 0.01
return adjusted
async def _fund_income_yield(fund: MutualFund) -> float:
if not fund.pays_income:
return 0.0
try:
info = await fund.info_record
except Exception:
info = None
data = info.raw_data if info and isinstance(info.raw_data, dict) else {}
distribution = data.get("distribution") if isinstance(data.get("distribution"), dict) else {}
other_facts = data.get("other_facts") if isinstance(data.get("other_facts"), dict) else {}
candidates = [
fund.income_amount,
distribution.get("max_distribution_rate"),
distribution.get("policy"),
other_facts.get("max_distribution_rate"),
]
return min(max((_parse_distribution_rate(value) for value in candidates), default=0.0), 0.18)
async def _collect_positions(portfolio_id: int) -> list[dict[str, Any]]:
from App.routers.portfolio.service import PortfolioService
positions = await PortfolioService.get_positions(portfolio_id)
return [
{
"asset_type": str(position["asset_type"]).upper(),
"asset_id": position["asset_id"],
"quantity": Decimal(str(position["quantity"])),
}
for position in positions
if _safe_float(position.get("current_value")) > 0
]
async def _build_stock_asset(position: dict[str, Any]) -> OptimizerAsset | None:
stock = await Stock.get_or_none(id=position["asset_id"])
if stock is None:
return None
prices = await StockPriceData.filter(
stock_id=stock.id,
date__gte=date.today() - timedelta(days=365 * 5 + 7),
).order_by("date").values("date", "closing_price")
price_values = [(row["date"], _safe_float(row["closing_price"])) for row in prices]
latest = await StockPriceData.filter(stock=stock).order_by("-date").first()
if latest is None:
return None
metrics = await calculate_metrics(stock) or {}
capital_return = _annualized_return(price_values)
dividend_yield = _safe_float(metrics.get("dividend_yield")) / 100
liquidity = metrics.get("liquidity") or await calculate_liquidity_metrics(stock)
liquidity["history_years"] = round(_history_years(price_values), 2)
fundamentals = {
"eps": metrics.get("eps"),
"pe_ratio": metrics.get("pe_ratio"),
"pb_ratio": metrics.get("pb_ratio") or metrics.get("price_to_book"),
"ps_ratio": metrics.get("ps_ratio"),
"earnings_yield": metrics.get("earnings_yield"),
"payout_ratio": metrics.get("payout_ratio"),
"gross_margin": metrics.get("gross_margin"),
"net_margin": metrics.get("net_margin"),
"debt_to_equity": metrics.get("debt_to_equity"),
"source_name": metrics.get("source_name"),
"source_url": metrics.get("source_url"),
"data_updated_at": metrics.get("data_updated_at"),
}
expected_return = _fundamental_adjusted_stock_return(
base_return=(capital_return if capital_return is not None else 0.10) + dividend_yield,
dividend_yield=dividend_yield,
fundamentals=fundamentals,
liquidity=liquidity,
)
volatility_result = calculate_annualized_volatility(price_values)
volatility = (
volatility_result.annualized_volatility if volatility_result else 0.28
)
current_price = _safe_float(latest.closing_price)
quantity = _safe_float(position["quantity"])
current_value = quantity * current_price
return OptimizerAsset(
key=f"STOCK:{stock.id}",
asset_id=stock.id,
asset_type="STOCK",
symbol=stock.symbol,
name=stock.name,
quantity=quantity,
current_price=current_price,
current_value=current_value,
expected_return=max(min(expected_return, 0.60), -0.30),
volatility=max(volatility, 0.08),
income_yield=max(dividend_yield, 0.0),
fee_rate=_stock_fee_rate(current_value),
fundamentals=fundamentals,
liquidity=liquidity,
)
async def _build_fund_asset(position: dict[str, Any]) -> OptimizerAsset | None:
fund = await MutualFund.get_or_none(id=position["asset_id"])
if fund is None:
return None
rows = await FundPerformance.filter(
fund_id=fund.id,
record_date__gte=date.today() - timedelta(days=365 * 5 + 7),
).order_by("record_date").values("record_date", "nav_per_unit")
nav_values = [
(row["record_date"], _safe_float(row["nav_per_unit"]))
for row in rows
if row.get("nav_per_unit") is not None
]
latest = await FundPerformance.filter(fund=fund).order_by("-record_date").first()
if latest is None or latest.nav_per_unit is None:
return None
income_yield = await _fund_income_yield(fund)
expected_return = (_annualized_return(nav_values) or 0.13) + income_yield
volatility_result = calculate_annualized_volatility(nav_values)
volatility = (
volatility_result.annualized_volatility if volatility_result else 0.08
)
current_price = _safe_float(latest.nav_per_unit)
quantity = _safe_float(position["quantity"])
current_value = quantity * current_price
prospectus_terms = await _fund_prospectus_terms(fund)
redemption_days = prospectus_terms.get("redemption_days")
liquidity_score = 82 if isinstance(redemption_days, int) and redemption_days <= 7 else 65
if prospectus_terms.get("lock_in_days"):
liquidity_score -= min(int(prospectus_terms["lock_in_days"]) / 180 * 30, 30)
if not prospectus_terms.get("prospectus_backed"):
liquidity_score -= 12
if str(prospectus_terms.get("currency") or "TZS").upper() != "TZS":
liquidity_score -= 18
return OptimizerAsset(
key=f"FUND:{fund.id}",
asset_id=fund.id,
asset_type="FUND",
symbol=fund.name[:10].upper(),
name=fund.name,
quantity=quantity,
current_price=current_price,
current_value=current_value,
expected_return=max(min(expected_return, 0.35), 0.03),
volatility=max(volatility, 0.03),
income_yield=income_yield,
fee_rate=prospectus_terms["entry_fee_rate"],
liquidity={
**prospectus_terms,
"liquidity_score": round(max(liquidity_score, 20), 2),
"history_years": round(_history_years(nav_values), 2),
},
)
async def _build_bond_asset(position: dict[str, Any]) -> OptimizerAsset | None:
bond = await Bond.get_or_none(id=position["asset_id"])
if bond is None:
return None
pricing_context = await get_bond_pricing_context(bond)
price_per_100 = _safe_float(pricing_context.get("price_percent"), 100.0)
current_price = price_per_100 / 100
quantity = _safe_float(position["quantity"])
current_value = quantity * current_price
coupon_rate = _safe_float(bond.coupon_rate) / 100
latest_yield_percent = pricing_context.get("latest_yield_percent")
market_yield = (
_safe_float(latest_yield_percent) / 100
if latest_yield_percent not in (None, "")
else 0.0
)
expected_return = market_yield if market_yield > 0 else max(coupon_rate, 0.10)
income_yield = coupon_rate / current_price if current_price > 0 else coupon_rate
return OptimizerAsset(
key=f"BOND:{bond.id}",
asset_id=bond.id,
asset_type="BOND",
symbol=bond.isin,
name=f"{bond.maturity_years} Yr Treasury Bond",
quantity=quantity,
current_price=current_price,
current_value=current_value,
expected_return=expected_return,
volatility=0.04,
income_yield=income_yield,
fee_rate=0.0,
liquidity={
"liquidity_score": pricing_context.get("liquidity_score", 45),
"valuation_quality": pricing_context.get("valuation_quality"),
"execution_caveat": pricing_context.get("execution_caveat"),
"latest_trade_date": pricing_context.get("latest_trade_date"),
"price_source": pricing_context.get("price_source"),
"latest_yield_percent": latest_yield_percent,
"coupon_rate_percent": _safe_float(bond.coupon_rate),
"price_percent": price_per_100,
"primary_reinvestment_yield_percent": (
_safe_float((pricing_context.get("comparable_primary_market") or [{}])[0].get("implied_yield_percent"))
if pricing_context.get("comparable_primary_market")
else None
),
"same_bond_secondary_available": pricing_context.get("same_bond_secondary_available"),
"comparable_secondary_market": pricing_context.get("comparable_secondary_market") or [],
"comparable_median_yield_percent": pricing_context.get("comparable_median_yield_percent"),
"comparable_primary_market": pricing_context.get("comparable_primary_market") or [],
},
)
async def _build_assets(portfolio_id: int) -> list[OptimizerAsset]:
assets: list[OptimizerAsset] = []
for position in await _collect_positions(portfolio_id):
asset_type = position["asset_type"]
if asset_type == "STOCK":
asset = await _build_stock_asset(position)
elif asset_type in {"FUND", "UTT"}:
asset = await _build_fund_asset(position)
elif asset_type == "BOND":
asset = await _build_bond_asset(position)
else:
asset = None
if asset and asset.current_value > 0:
assets.append(asset)
return assets
def _asset_candidate_score(asset: OptimizerAsset, settings: dict[str, Any]) -> float:
liquidity_score = _safe_float((asset.liquidity or {}).get("liquidity_score")) / 100
valuation_penalty = 0.0
if asset.asset_type == "STOCK":
pe_ratio = _safe_float((asset.fundamentals or {}).get("pe_ratio"))
pb_ratio = _safe_float((asset.fundamentals or {}).get("pb_ratio"))
if pe_ratio > 30 or pb_ratio > 5:
valuation_penalty += 0.05
if pe_ratio < 0:
valuation_penalty += 0.08
return (
asset.expected_return * settings["return_weight"]
+ asset.income_yield * settings["income_weight"]
+ liquidity_score * 0.08
- asset.volatility * settings["volatility_penalty"]
- valuation_penalty
)
async def _build_new_candidate_assets(
current_assets: list[OptimizerAsset],
risk_profile: str | None,
risk_score: int | None,
limit: int = 8,
) -> list[OptimizerAsset]:
current_keys = {asset.key for asset in current_assets}
settings = _risk_settings(risk_profile, risk_score)
candidates: list[OptimizerAsset] = []
funds = await MutualFund.filter(status="Active").all()
for fund in funds:
if f"FUND:{fund.id}" in current_keys:
continue
asset = await _build_fund_asset({"asset_id": fund.id, "quantity": Decimal("1")})
if asset:
if not (asset.liquidity or {}).get("prospectus_backed"):
continue
if str((asset.liquidity or {}).get("currency") or "TZS").upper() != "TZS":
continue
candidates.append(replace(asset, quantity=0.0, current_value=0.0, is_candidate=True))
stocks = await Stock.all()
for stock in stocks:
if f"STOCK:{stock.id}" in current_keys:
continue
asset = await _build_stock_asset({"asset_id": stock.id, "quantity": Decimal("1")})
if not asset:
continue
max_buy_value = _safe_float((asset.liquidity or {}).get("max_buy_value_20d"))
if max_buy_value <= 0 and _safe_float((asset.liquidity or {}).get("liquidity_score")) < 20:
continue
candidates.append(replace(asset, quantity=0.0, current_value=0.0, is_candidate=True))
return sorted(
candidates,
key=lambda asset: _asset_candidate_score(asset, settings),
reverse=True,
)[:limit]
async def _stock_candidate(stock: Stock) -> dict[str, Any] | None:
asset = await _build_stock_asset({"asset_id": stock.id, "quantity": Decimal("1")})
if asset is None:
return None
return {
"asset_id": asset.asset_id,
"asset_type": asset.asset_type,
"symbol": asset.symbol,
"name": asset.name,
"expected_return": round(asset.expected_return, 4),
"volatility": round(asset.volatility, 4),
"income_yield": round(asset.income_yield, 4),
"liquidity": asset.liquidity,
"fundamentals": asset.fundamentals,
"reason": "Stock candidate with fundamentals, dividend history, and observed DSE liquidity.",
}
async def _fund_candidate(fund: MutualFund) -> dict[str, Any] | None:
asset = await _build_fund_asset({"asset_id": fund.id, "quantity": Decimal("1")})
if asset is None:
return None
return {
"asset_id": asset.asset_id,
"asset_type": asset.asset_type,
"symbol": asset.symbol,
"name": asset.name,
"expected_return": round(asset.expected_return, 4),
"volatility": round(asset.volatility, 4),
"income_yield": round(asset.income_yield, 4),
"pays_income": asset.income_yield > 0,
"liquidity": asset.liquidity,
"reason": (
"Income-paying fund candidate with lower measured volatility."
if asset.income_yield > 0
else "Fund candidate with lower measured volatility."
),
}
async def _suggest_alternatives(current_assets: list[OptimizerAsset], risk_profile: str | None, risk_score: int | None) -> list[dict[str, Any]]:
current_keys = {asset.key for asset in current_assets}
settings = _risk_settings(risk_profile, risk_score)
candidates: list[dict[str, Any]] = []
funds = await MutualFund.filter(status="Active").all()
for fund in funds:
if f"FUND:{fund.id}" in current_keys:
continue
candidate = await _fund_candidate(fund)
if candidate:
candidates.append(candidate)
stocks = await Stock.all()
for stock in stocks:
if f"STOCK:{stock.id}" in current_keys:
continue
candidate = await _stock_candidate(stock)
if candidate:
candidates.append(candidate)
if not candidates:
return []
current_volatility = max((asset.volatility for asset in current_assets), default=0.0)
max_volatility = current_volatility * (0.85 if settings["key"] == "conservative" else 1.10)
safer = [item for item in candidates if item["volatility"] <= max_volatility] or candidates
def score(item: dict[str, Any]) -> float:
return (
item["expected_return"] * settings["return_weight"]
+ item.get("income_yield", 0.0) * settings["income_weight"]
- item["volatility"] * settings["volatility_penalty"]
)
return sorted(safer, key=score, reverse=True)[:5]
def _is_igrowth(asset: OptimizerAsset) -> bool:
text = f"{asset.symbol} {asset.name}".upper()
return "IGROWTH" in text or ("ITRUST" in text and "GROWTH" in text)
async def _igrowth_proxy_context(assets: list[OptimizerAsset], total_value: float) -> dict[str, Any] | None:
igrowth_assets = [asset for asset in assets if _is_igrowth(asset)]
if not igrowth_assets:
return None
stock_candidates: list[OptimizerAsset] = []
for stock in await Stock.all():
asset = await _build_stock_asset({"asset_id": stock.id, "quantity": Decimal("1")})
if asset is None:
continue
liquidity_score = _safe_float((asset.liquidity or {}).get("liquidity_score"))
max_buy_value = _safe_float((asset.liquidity or {}).get("max_buy_value_20d"))
if liquidity_score >= 35 and max_buy_value >= total_value * 0.02:
stock_candidates.append(asset)
if not stock_candidates:
return {
"method": "top-liquid-stock proxy",
"available": False,
"message": "IGROWTH holdings were not available and no liquid stock proxy basket passed the volume screen.",
}
top = sorted(
stock_candidates,
key=lambda asset: _asset_candidate_score(asset, _risk_settings("growth", None)),
reverse=True,
)[:5]
weights = np.array([
max(_safe_float((asset.liquidity or {}).get("liquidity_score")), 1.0)
for asset in top
])
weights = weights / weights.sum()
proxy_return = float(np.dot(weights, np.array([asset.expected_return for asset in top])))
proxy_capacity = sum(_safe_float((asset.liquidity or {}).get("max_buy_value_20d")) for asset in top)
notes = []
for asset in igrowth_assets:
asset.expected_return = min(asset.expected_return + 0.01, 0.35)
gap = proxy_return - asset.expected_return
if gap > 0.05 and proxy_capacity >= asset.current_value * 0.25:
penalty = min(0.04, gap * 0.25)
asset.expected_return = max(asset.expected_return - penalty, 0.03)
notes.append(
f"{asset.symbol} underperformed the liquid-stock proxy, but keeps a liquidity/diversification credit."
)
return {
"method": "top-liquid-stock proxy",
"available": True,
"igrowth_symbols": [asset.symbol for asset in igrowth_assets],
"proxy_expected_return": round(proxy_return, 4),
"proxy_capacity_20d": round(proxy_capacity, 2),
"proxy_assets": [
{
"symbol": asset.symbol,
"name": asset.name,
"expected_return": round(asset.expected_return, 4),
"volatility": round(asset.volatility, 4),
"liquidity_score": round(_safe_float((asset.liquidity or {}).get("liquidity_score")), 2),
"max_buy_value_20d": round(_safe_float((asset.liquidity or {}).get("max_buy_value_20d")), 2),
}
for asset in top
],
"notes": notes
or [
"IGROWTH is treated as a liquid/diversified wrapper and compared against a volume-screened top-stock proxy."
],
}
def _portfolio_stats(weights: np.ndarray, returns: np.ndarray, vols: np.ndarray) -> dict:
portfolio_return = float(np.dot(weights, returns))
portfolio_vol = float(np.sqrt(np.dot(weights**2, vols**2)))
sharpe = (
(portfolio_return - DEFAULT_RISK_FREE_RATE) / portfolio_vol
if portfolio_vol > 0
else 0.0
)
return {
"expected_return": portfolio_return,
"volatility": portfolio_vol,
"sharpe": sharpe,
}
def _class_totals(weights: np.ndarray, assets: list[OptimizerAsset]) -> dict[str, float]:
totals = {"STOCK": 0.0, "FUND": 0.0, "BOND": 0.0}
for weight, asset in zip(weights, assets):
totals[asset.asset_type] = totals.get(asset.asset_type, 0.0) + float(weight)
return totals
def _respects_limits(weights: np.ndarray, assets: list[OptimizerAsset], settings: dict[str, Any]) -> bool:
effective_max_asset_weight = max(float(settings["max_asset_weight"]), 1 / len(assets))
if np.any(weights < 0) or np.any(weights > effective_max_asset_weight):
return False
available_classes = {asset.asset_type for asset in assets}
if len(available_classes) == 1:
return True
class_totals = _class_totals(weights, assets)
class_limits = settings["class_limits"]
for asset_type in available_classes:
total = class_totals.get(asset_type, 0.0)
min_weight, max_weight = class_limits[asset_type]
if total < min_weight or total > max_weight:
return False
return True
def _estimate_rebalance_fees(
current_weights: np.ndarray,
suggested_weights: np.ndarray,
assets: list[OptimizerAsset],
total_value: float,
) -> float:
return sum(item["estimated_fee"] for item in _fee_breakdown(current_weights, suggested_weights, assets, total_value))
def _liquidity_trade_warnings(
current_weights: np.ndarray,
suggested_weights: np.ndarray,
assets: list[OptimizerAsset],
total_value: float,
) -> list[str]:
warnings = []
for current, suggested, asset in zip(current_weights, suggested_weights, assets):
if asset.asset_type != "STOCK" or suggested <= current:
continue
trade_amount = float(suggested - current) * total_value
max_buy_value = _safe_float((asset.liquidity or {}).get("max_buy_value_20d"))
if max_buy_value <= 0:
warnings.append(f"{asset.symbol} has no recent turnover capacity, so new buys should be treated as speculative.")
elif trade_amount > max_buy_value:
warnings.append(
f"{asset.symbol} buy is capped by observed volume: suggested demand exceeds about 10% of 20 trading days of average turnover."
)
return warnings
def _liquidity_capacity_penalty(
current_weights: np.ndarray,
suggested_weights: np.ndarray,
assets: list[OptimizerAsset],
total_value: float,
) -> float:
penalty = 0.0
for current, suggested, asset in zip(current_weights, suggested_weights, assets):
if suggested <= current:
continue
trade_amount = float(suggested - current) * total_value
if asset.asset_type == "STOCK":
max_buy_value = _safe_float((asset.liquidity or {}).get("max_buy_value_20d"))
if max_buy_value <= 0:
penalty += 0.25
continue
if trade_amount > max_buy_value:
penalty += min((trade_amount / max_buy_value - 1) * 0.08, 0.35)
elif asset.asset_type == "FUND":
terms = asset.liquidity or {}
minimum = (
_safe_float(terms.get("min_initial"))
if current <= 0
else _safe_float(terms.get("min_additional"))
)
if minimum and trade_amount < minimum:
penalty += 0.18
if not terms.get("prospectus_backed"):
penalty += 0.08
if _safe_float(terms.get("lock_in_days")) > 0:
penalty += min(_safe_float(terms.get("lock_in_days")) / 365 * 0.08, 0.08)
return penalty
def _apply_liquidity_caps(
current_weights: np.ndarray,
suggested_weights: np.ndarray,
assets: list[OptimizerAsset],
total_value: float,
settings: dict[str, Any],
) -> tuple[np.ndarray, list[str]]:
capped = suggested_weights.copy()
warnings = []
excess = 0.0
for index, (current, suggested, asset) in enumerate(zip(current_weights, capped, assets)):
if suggested <= current:
continue
if asset.asset_type == "STOCK":
max_buy_value = _safe_float((asset.liquidity or {}).get("max_buy_value_20d"))
if max_buy_value <= 0:
max_weight = float(current)
else:
max_weight = float(current) + (max_buy_value / total_value if total_value else 0.0)
if suggested > max_weight:
excess += float(suggested - max_weight)
capped[index] = max_weight
warnings.append(
f"{asset.symbol} increase was capped because DSE volume may not support the full trade size."
)
elif asset.asset_type == "FUND":
trade_amount = float(suggested - current) * total_value
terms = asset.liquidity or {}
minimum = (
_safe_float(terms.get("min_initial"))
if current <= 0
else _safe_float(terms.get("min_additional"))
)
if minimum and trade_amount < minimum:
excess += float(capped[index] - current)
capped[index] = 0.0 if current <= 0 else current
warnings.append(
f"{asset.symbol} buy was removed because it is below the prospectus minimum investment/addition."
)
if excess <= 0:
return capped, warnings
effective_max_asset_weight = max(float(settings["max_asset_weight"]), 1 / len(assets))
receiver_order = {"FUND": 0, "BOND": 1, "STOCK": 2}
def can_receive(index: int, asset: OptimizerAsset) -> bool:
room = max(effective_max_asset_weight - float(capped[index]), 0.0)
if room <= 0.0001:
return False
if asset.asset_type != "FUND":
return True
terms = asset.liquidity or {}
minimum = (
_safe_float(terms.get("min_initial"))
if capped[index] <= 0
else _safe_float(terms.get("min_additional"))
)
return not minimum or room * total_value >= minimum
receivers = sorted(
[
(index, asset)
for index, asset in enumerate(assets)
if can_receive(index, asset)
],
key=lambda item: (receiver_order.get(item[1].asset_type, 9), -item[1].expected_return),
)
for index, _asset in receivers:
if excess <= 0:
break
room = max(effective_max_asset_weight - float(capped[index]), 0.0)
addition = min(room, excess)
capped[index] += addition
excess -= addition
if capped.sum() > 0:
capped = capped / capped.sum()
return capped, warnings
def _fee_breakdown(
current_weights: np.ndarray,
suggested_weights: np.ndarray,
assets: list[OptimizerAsset],
total_value: float,
) -> list[dict[str, Any]]:
items = []
for current, suggested, asset in zip(current_weights, suggested_weights, assets):
difference = float(suggested - current)
trade_amount = abs(difference) * total_value
if trade_amount <= 0:
continue
side = "BUY" if difference > 0 else "SELL"
fee_rate = 0.0
fee_source = "No explicit transaction fee"
fee_components: dict[str, float] = {}
if asset.asset_type == "STOCK":
band = _stock_fee_band(trade_amount)
if band:
fee_components = {
"brokerage_commission": round(trade_amount * (_safe_float(band.get("brokerage_commission")) / 100), 2),
"transaction_fee_cmsa": round(trade_amount * (_safe_float(band.get("transaction_fee_cmsa")) / 100), 2),
"transaction_fee_dse": round(trade_amount * (_safe_float(band.get("transaction_fee_dse")) / 100), 2),
"fidelity_fee": round(trade_amount * (_safe_float(band.get("fidelity_fee")) / 100), 2),
"csd_fee": round(trade_amount * (_safe_float(band.get("csd_fee")) / 100), 2),
}
fee_rate = _safe_float(band.get("total_cost_to_investor")) / 100
fee_source = f"DSE equity fee band: {band.get('label')}"
else:
fee_rate = asset.fee_rate
fee_source = "DSE equity fee fallback"
elif asset.asset_type == "FUND" and side == "BUY":
fee_rate = asset.fee_rate
fee_source = "Fund entry load"
elif asset.asset_type == "FUND" and side == "SELL":
fee_rate = _safe_float((asset.liquidity or {}).get("exit_fee_rate"))
fee_source = "Fund exit load from prospectus metadata"
if fee_rate <= 0:
continue
fee = trade_amount * fee_rate
items.append(
{
"asset_id": asset.asset_id,
"asset_type": asset.asset_type,
"symbol": asset.symbol,
"name": asset.name,
"side": side,
"trade_amount": round(trade_amount, 2),
"buy_amount": round(trade_amount if side == "BUY" else 0.0, 2),
"sell_amount": round(trade_amount if side == "SELL" else 0.0, 2),
"fee_rate": round(fee_rate, 6),
"estimated_fee": round(fee, 2),
"fee_components": fee_components,
"fee_source": fee_source,
}
)
return sorted(items, key=lambda item: item["estimated_fee"], reverse=True)
def _fee_summary(fee_items: list[dict[str, Any]]) -> dict[str, Any]:
by_type: dict[str, float] = {}
for item in fee_items:
by_type[item["asset_type"]] = by_type.get(item["asset_type"], 0.0) + item["estimated_fee"]
total = sum(by_type.values())
return {
"total": round(total, 2),
"by_asset_type": {key: round(value, 2) for key, value in by_type.items()},
"items": fee_items,
}
def _advisor_comment(
current_stats: dict[str, float],
suggested_stats: dict[str, float],
allocations: list[dict[str, Any]],
estimated_fees: float,
risk_label: str,
) -> dict[str, Any]:
increases = [item for item in allocations if item["difference"] > 0.03]
reductions = [item for item in allocations if item["difference"] < -0.03]
top_increase = increases[0] if increases else None
new_buys = [item for item in increases if item.get("is_new_candidate")]
top_reduction = sorted(reductions, key=lambda item: item["difference"])[0] if reductions else None
sharpe_delta = suggested_stats["sharpe"] - current_stats["sharpe"]
return_delta = suggested_stats["expected_return"] - current_stats["expected_return"]
summary = (
f"The suggested allocation is tuned for a {risk_label.lower()} risk setting while trying "
"to avoid one position carrying too much of the risk."
)
if sharpe_delta > 0.05:
summary = (
"The suggested allocation looks stronger on a risk-adjusted basis: it "
"improves the expected Sharpe score while keeping the portfolio growth-focused."
)
elif return_delta > 0:
summary = (
"The suggested allocation mainly improves expected growth, but the risk-adjusted "
"improvement is modest, so rebalance gradually rather than all at once."
)
actions = []
if top_increase:
if top_increase.get("is_new_candidate"):
actions.append(
f"Buy {top_increase['symbol']} toward {top_increase['suggested_weight'] * 100:.1f}% because it passed the return, valuation, fee, and liquidity screens."
)
else:
actions.append(
f"Increase {top_increase['symbol']} toward {top_increase['suggested_weight'] * 100:.1f}% because its growth return still looks attractive after estimated fees."
)
if new_buys and (not top_increase or not top_increase.get("is_new_candidate")):
actions.append(
"Add selected new stock/fund candidates only where observed trading volume can support the buy size."
)
if top_reduction:
actions.append(
f"Reduce {top_reduction['symbol']} toward {top_reduction['suggested_weight'] * 100:.1f}% to lower concentration or weaker risk-adjusted exposure."
)
if estimated_fees > 0:
actions.append(
f"Estimated rebalance fees are about TZS {estimated_fees:,.0f}, so the changes should be made only if the expected improvement is worth that cost."
)
if not actions:
actions.append("No major rebalance is needed; current weights are close to the suggested growth allocation.")
return {
"title": "Growth allocation view",
"summary": summary,
"key_actions": actions[:3],
"caution": (
"This is an allocation model, not a guarantee. It uses historical prices, NAVs, "
"dividends, fund payout flags, bond coupons, cached fundamentals, observed stock volume, and known transaction costs; financial statement quality and future news still need human review."
),
}
def _bond_execution_note(asset: OptimizerAsset) -> str:
liquidity = asset.liquidity or {}
secondary = liquidity.get("comparable_secondary_market") or []
primary = liquidity.get("comparable_primary_market") or []
parts = []
if liquidity.get("same_bond_secondary_available"):
parts.append(
f"same bond last traded {liquidity.get('latest_trade_date')} at yield "
f"{_safe_float(liquidity.get('latest_yield_percent')):.2f}%"
)
elif secondary:
median_yield = liquidity.get("comparable_median_yield_percent")
parts.append(
f"no same-bond print; comparable secondary prints show median yield "
f"{_safe_float(median_yield):.2f}%"
)
else:
parts.append("no fresh same-bond or comparable secondary print is stored")
if primary:
top = primary[0]
parts.append(
f"recent primary auction reference {top.get('bond_no')} "
f"from {top.get('auction_date')} "
f"({top.get('maturity_years')}y) approximate reinvestment yield "
f"{_safe_float(top.get('implied_yield_percent')):.2f}% at price "
f"{_safe_float(top.get('price_per_100')):.2f}"
)
else:
parts.append("no recent comparable primary auction reference is stored")
caveat = liquidity.get("execution_caveat")
if caveat:
parts.append(str(caveat))
return "Secondary/primary market check: " + "; ".join(parts)
def _bond_return_worth_note(asset: OptimizerAsset) -> str:
liquidity = asset.liquidity or {}
net_return = asset.expected_return - asset.fee_rate
hurdle = DEFAULT_RISK_FREE_RATE
spread = net_return - hurdle
price_percent = _safe_float(liquidity.get("price_percent"), asset.current_price * 100)
latest_yield = _safe_float(liquidity.get("latest_yield_percent"), net_return * 100)
income_yield = asset.income_yield * 100
verdict = "adequate" if spread >= 0.015 else "thin" if spread >= 0 else "weak"
premium_note = (
f" premium price {price_percent:.2f}% means coupon income is not the same as total return;"
if price_percent > 110
else ""
)
return (
f"Return worth check: {verdict}; market yield {latest_yield:.2f}% "
f"vs {hurdle * 100:.2f}% hurdle ({spread * 100:+.2f}%), "
f"income yield on invested price about {income_yield:.2f}%.{premium_note}"
)
def _bond_coupon_advantage(asset: OptimizerAsset) -> float:
liquidity = asset.liquidity or {}
coupon = _safe_float(liquidity.get("coupon_rate_percent")) / 100
reinvestment_yield = _safe_float(liquidity.get("primary_reinvestment_yield_percent")) / 100
if coupon <= 0 or reinvestment_yield <= 0:
return 0.0
return coupon - reinvestment_yield
def _is_irreplaceable_income_bond(asset: OptimizerAsset) -> bool:
return (
asset.asset_type == "BOND"
and _bond_coupon_advantage(asset) >= HIGH_COUPON_ADVANTAGE_BPS / 10_000
)
def _has_rock_solid_fundamentals(asset: OptimizerAsset) -> bool:
fundamentals = asset.fundamentals or {}
if asset.asset_type != "STOCK":
return False
pe_ratio = _safe_float(fundamentals.get("pe_ratio"))
pb_ratio = _safe_float(fundamentals.get("pb_ratio"))
eps = _safe_float(fundamentals.get("eps"))
debt_to_equity = _safe_float(fundamentals.get("debt_to_equity"), default=-1)
return (
eps > 0
and 0 < pe_ratio <= 22
and 0 < pb_ratio <= 3.5
and 0 <= debt_to_equity <= 150
)
def _has_visible_macro_tailwind(asset: OptimizerAsset) -> bool:
data = {**(asset.fundamentals or {}), **(asset.liquidity or {})}
return bool(
data.get("macro_tailwind")
or data.get("sector_tailwind")
or data.get("rate_tailwind")
or str(data.get("macro_context") or "").lower() in {"tailwind", "supportive", "positive"}
)
def _clean_exit_liquidity(asset: OptimizerAsset, trade_value: float) -> bool:
liquidity = asset.liquidity or {}
if asset.asset_type == "STOCK":
max_buy_value = _safe_float(liquidity.get("max_buy_value_20d"))
return max_buy_value > 0 and trade_value <= max_buy_value
if asset.asset_type == "FUND":
redemption_days = _safe_float(liquidity.get("redemption_days"), default=999)
return redemption_days <= 7 and bool(liquidity.get("prospectus_backed"))
return False
def _replacement_quality_for_irreplaceable_bond(
bond: OptimizerAsset,
assets: list[OptimizerAsset],
suggested_weights: np.ndarray,
current_weights: np.ndarray,
total_value: float,
) -> tuple[bool, list[str]]:
coupon_advantage = _bond_coupon_advantage(bond)
required_excess = coupon_advantage + LEGACY_BOND_REPLACEMENT_MARGIN
increased_assets = [
(asset, float(suggested - current) * total_value)
for asset, suggested, current in zip(assets, suggested_weights, current_weights)
if asset.key != bond.key and suggested > current + 0.01
]
failures: list[str] = []
for asset, trade_value in increased_assets:
net_return_advantage = asset.expected_return - asset.fee_rate - bond.income_yield
gates = {
"return_margin": net_return_advantage >= required_excess,
"low_moderate_volatility": asset.volatility <= LOW_MODERATE_VOLATILITY_MAX,
"rock_solid_fundamentals": _has_rock_solid_fundamentals(asset),
"macro_tailwind": _has_visible_macro_tailwind(asset),
"history_and_liquidity": (
_safe_float((asset.liquidity or {}).get("history_years")) >= MIN_REPLACEMENT_HISTORY_YEARS
and _clean_exit_liquidity(asset, trade_value)
),
}
if all(gates.values()):
return True, []
failed = ", ".join(name for name, passed in gates.items() if not passed)
failures.append(f"{asset.symbol} failed replacement gates: {failed}")
if not failures:
failures.append("No increased replacement asset was available for the income stream.")
return False, failures
def _protect_irreplaceable_bonds(
current_weights: np.ndarray,
suggested_weights: np.ndarray,
assets: list[OptimizerAsset],
total_value: float,
) -> tuple[np.ndarray, list[str]]:
protected = suggested_weights.copy()
warnings: list[str] = []
for index, asset in enumerate(assets):
if not _is_irreplaceable_income_bond(asset) or protected[index] >= current_weights[index]:
continue
qualified, failures = _replacement_quality_for_irreplaceable_bond(
asset,
assets,
protected,
current_weights,
total_value,
)
if qualified:
continue
restored = float(current_weights[index] - protected[index])
protected[index] = current_weights[index]
warnings.append(
f"{asset.symbol} reduction blocked: coupon materially exceeds recent primary reinvestment yield, so default is hold to maturity unless all replacement gates pass. "
+ " ".join(failures[:2])
)
receiver_indices = [
i
for i, (current, suggested) in enumerate(zip(current_weights, protected))
if i != index and suggested > current
]
receiver_total = sum(float(protected[i] - current_weights[i]) for i in receiver_indices)
if receiver_total > 0:
for i in receiver_indices:
reduction = restored * float(protected[i] - current_weights[i]) / receiver_total
protected[i] = max(current_weights[i], protected[i] - reduction)
total = float(protected.sum())
if total > 0:
protected = protected / total
return protected, warnings
def _optimize_sync(
assets_payload: list[dict[str, Any]],
simulations: int,
risk_profile: str | None,
risk_score: int | None,
alternatives: list[dict[str, Any]],
igrowth_context: dict[str, Any] | None = None,
) -> dict[str, Any]:
assets = [OptimizerAsset(**payload) for payload in assets_payload]
settings = _risk_settings(risk_profile, risk_score)
total_value = sum(asset.current_value for asset in assets)
current_weights = np.array([asset.current_value / total_value for asset in assets])
returns = np.array([asset.expected_return for asset in assets])
vols = np.array([asset.volatility for asset in assets])
income_yields = np.array([asset.income_yield for asset in assets])
best_weights = current_weights.copy()
best_score = -float("inf")
rng = np.random.default_rng(42)
attempts = max(simulations, 1000)
for _ in range(attempts):
weights = rng.dirichlet(np.ones(len(assets)))
if not _respects_limits(weights, assets, settings):
continue
stats = _portfolio_stats(weights, returns, vols)
fees = _estimate_rebalance_fees(current_weights, weights, assets, total_value)
fee_drag = fees / total_value if total_value else 0.0
income_score = float(np.dot(weights, income_yields))
liquidity_penalty = _liquidity_capacity_penalty(current_weights, weights, assets, total_value)
score = (
stats["sharpe"]
+ settings["return_weight"] * stats["expected_return"]
+ settings["income_weight"] * income_score
- settings["volatility_penalty"] * stats["volatility"]
- fee_drag
- liquidity_penalty
)
if score > best_score:
best_score = score
best_weights = weights
best_weights, liquidity_warnings = _apply_liquidity_caps(
current_weights,
best_weights,
assets,
total_value,
settings,
)
best_weights, bond_protection_warnings = _protect_irreplaceable_bonds(
current_weights,
best_weights,
assets,
total_value,
)
liquidity_warnings.extend(bond_protection_warnings)
liquidity_warnings.extend(
_liquidity_trade_warnings(current_weights, best_weights, assets, total_value)
)
current_stats = _portfolio_stats(current_weights, returns, vols)
suggested_stats = _portfolio_stats(best_weights, returns, vols)
estimated_fees = _estimate_rebalance_fees(
current_weights, best_weights, assets, total_value
)
fee_items = _fee_breakdown(current_weights, best_weights, assets, total_value)
fee_summary = _fee_summary(fee_items)
allocations = []
for asset, current_weight, suggested_weight in zip(
assets, current_weights, best_weights
):
difference = float(suggested_weight - current_weight)
reason = "Hold allocation"
if difference > 0.03:
reason = (
"Buy new candidate after valuation, liquidity, fees, and prospectus checks"
if asset.is_candidate
else "Increase for growth-adjusted return after fees"
)
if asset.asset_type == "BOND":
reason = f"{reason}. {_bond_return_worth_note(asset)} {_bond_execution_note(asset)}"
elif difference < -0.03:
reason = "Reduce concentration or weaker risk-adjusted return"
if asset.asset_type == "BOND":
reason = f"{reason}. {_bond_return_worth_note(asset)} {_bond_execution_note(asset)}"
elif asset.asset_type == "BOND":
if _is_irreplaceable_income_bond(asset):
reason = (
"Hold to maturity by default: coupon materially exceeds recent primary reinvestment yields, "
"and no replacement passed all return, volatility, fundamentals, macro, history, and liquidity gates"
)
reason = f"{reason}. {_bond_return_worth_note(asset)} {_bond_execution_note(asset)}"
allocations.append(
{
"asset_id": asset.asset_id,
"asset_type": asset.asset_type,
"symbol": asset.symbol,
"name": asset.name,
"current_weight": round(float(current_weight), 4),
"suggested_weight": round(float(suggested_weight), 4),
"difference": round(difference, 4),
"current_value": round(asset.current_value, 2),
"suggested_value": round(float(suggested_weight) * total_value, 2),
"rebalance_amount": round(difference * total_value, 2),
"expected_return": round(asset.expected_return, 4),
"volatility": round(asset.volatility, 4),
"income_yield": round(asset.income_yield, 4),
"fee_rate": round(asset.fee_rate, 4),
"is_new_candidate": asset.is_candidate,
"fundamentals": asset.fundamentals,
"liquidity": asset.liquidity,
"reason": reason,
}
)
sorted_allocations = sorted(
allocations,
key=lambda item: item["suggested_weight"],
reverse=True,
)
return {
"objective": "growth",
"risk_profile": settings["key"],
"risk_label": settings["label"],
"risk_score": settings["risk_score"],
"total_value": round(total_value, 2),
"risk_free_rate": DEFAULT_RISK_FREE_RATE,
"constraints": {
"max_asset_weight": max(float(settings["max_asset_weight"]), 1 / len(assets)),
"class_limits": settings["class_limits"],
"class_limits_apply_only_when_multiple_asset_classes_exist": True,
},
"current": {k: round(v, 4) for k, v in current_stats.items()},
"suggested": {k: round(v, 4) for k, v in suggested_stats.items()},
"estimated_rebalance_fees": round(estimated_fees, 2),
"fee_breakdown": fee_summary,
"liquidity_warnings": liquidity_warnings,
"igrowth_proxy_comparison": igrowth_context,
"advisor_comment": _advisor_comment(
current_stats,
suggested_stats,
sorted_allocations,
estimated_fees,
settings["label"],
),
"allocations": sorted_allocations,
"alternatives": alternatives,
"notes": [
"Suggested allocations can include new stock and fund candidates when they pass valuation, liquidity, fee, and risk checks.",
"Stock buy fee estimates use the stored DSE transaction fee bands and stock increases are capped by observed DSE turnover capacity.",
"Stock dividends, fund income flags, bond coupons, and fund entry loads are included where available.",
"Stock fundamentals use cached Simply Wall St snapshots when available; missing EPS, PE, or PB values are not guessed.",
"Optimization uses Monte Carlo simulation to stay lightweight on Hugging Face.",
],
}
async def analyze_portfolio_growth_allocation(
portfolio_id: int,
simulations: int = DEFAULT_SIMULATIONS,
risk_profile: str | None = None,
risk_score: int | None = None,
) -> dict[str, Any]:
current_assets = await _build_assets(portfolio_id)
if not current_assets:
raise ValueError("At least one priced asset is required for allocation analysis")
alternatives = await _suggest_alternatives(current_assets, risk_profile, risk_score)
settings = _risk_settings(risk_profile, risk_score)
total_value = sum(asset.current_value for asset in current_assets)
igrowth_context = await _igrowth_proxy_context(current_assets, total_value)
candidate_assets = await _build_new_candidate_assets(current_assets, risk_profile, risk_score)
assets = current_assets + candidate_assets
if len(assets) == 1:
asset = assets[0]
current_stats = {
"expected_return": asset.expected_return,
"volatility": asset.volatility,
"sharpe": (
(asset.expected_return - DEFAULT_RISK_FREE_RATE) / asset.volatility
if asset.volatility > 0
else 0.0
),
}
allocation = {
"asset_id": asset.asset_id,
"asset_type": asset.asset_type,
"symbol": asset.symbol,
"name": asset.name,
"current_weight": 1.0,
"suggested_weight": 1.0,
"difference": 0.0,
"current_value": round(asset.current_value, 2),
"suggested_value": round(asset.current_value, 2),
"rebalance_amount": 0.0,
"expected_return": round(asset.expected_return, 4),
"volatility": round(asset.volatility, 4),
"fee_rate": round(asset.fee_rate, 4),
"reason": "Only priced asset in portfolio; add other assets before optimization can rebalance.",
}
return {
"objective": "growth",
"risk_profile": settings["key"],
"risk_label": settings["label"],
"risk_score": settings["risk_score"],
"total_value": round(asset.current_value, 2),
"risk_free_rate": DEFAULT_RISK_FREE_RATE,
"constraints": {
"max_asset_weight": 1.0,
"class_limits": GROWTH_CLASS_LIMITS,
"class_limits_apply_only_when_multiple_asset_classes_exist": True,
},
"current": {k: round(v, 4) for k, v in current_stats.items()},
"suggested": {k: round(v, 4) for k, v in current_stats.items()},
"estimated_rebalance_fees": 0.0,
"fee_breakdown": _fee_summary([]),
"advisor_comment": {
"title": "Concentration warning",
"summary": (
f"Your portfolio is currently 100% allocated to {asset.symbol}. "
"That may have worked well, but the growth engine is concentrated in one security, "
"so the next improvement is diversification rather than reweighting."
),
"key_actions": [
"Add at least one more priced asset before running a full optimizer.",
"For a growth portfolio, consider blending stocks with a liquid fund, ETF, or bond exposure to reduce single-name risk.",
"Keep fees in mind: adding a new DSE stock position has transaction costs, so diversify in meaningful ticket sizes.",
],
"caution": (
"The model cannot calculate an optimized allocation from one asset alone. "
"It can only flag concentration risk and explain what data is missing."
),
},
"allocations": [allocation],
"alternatives": alternatives,
"notes": [
"Single-asset portfolios receive a concentration analysis instead of a failed optimization.",
"Add at least two priced assets to enable portfolio rebalancing advice.",
],
}
return await asyncio.to_thread(
_optimize_sync,
[asset.__dict__ for asset in assets],
simulations,
risk_profile,
risk_score,
alternatives,
igrowth_context,
)
async def run_portfolio_analysis_task(
task_id: int,
portfolio_id: int,
simulations: int = DEFAULT_SIMULATIONS,
risk_profile: str | None = None,
risk_score: int | None = None,
) -> None:
await ImportTask.filter(id=task_id).update(
status=TaskStatus.RUNNING,
details={
"portfolio_id": portfolio_id,
"risk_profile": _risk_settings(risk_profile, risk_score)["key"],
"risk_score": _risk_settings(risk_profile, risk_score)["risk_score"],
"status_message": "Portfolio growth allocation analysis is running",
},
)
try:
result = await analyze_portfolio_growth_allocation(portfolio_id, simulations, risk_profile, risk_score)
except Exception as exc:
await ImportTask.filter(id=task_id).update(
status=TaskStatus.FAILED,
details={
"portfolio_id": portfolio_id,
"error": str(exc),
},
)
return
await ImportTask.filter(id=task_id).update(
status=TaskStatus.COMPLETED,
details={
"portfolio_id": portfolio_id,
"result": result,
},
)