""" Data Engine: -d mode Reads the fund-stats CSV and exports processed Excel matching Processed data.xlsx format. Layout (matching target XLSX): - One combined sheet with all fund categories - Header row (light green #C9FFCC) - For each category: - Category header row (no fill, bold text) - BM Index row (Col A: #BAEAEE, CAGR cols F,G,H,I: #C4EFFF) - Category Average row (Col A: #BAEAEE, CAGR cols F,G,H,I + P/E,P/B cols L,M: #C4EFFF) - Fund rows sorted by score (weightage) descending, strictly largest to lowest - Weightage scoring: Compare fund CAGR vs Category Average (NOT BM Index) - 1Y CAGR beats Cat Avg: 2 pts - 3Y CAGR beats Cat Avg: 3 pts - 5Y CAGR beats Cat Avg: 4 pts - 10Y CAGR beats Cat Avg: 5 pts - Max possible: 14 pts - Yellow background (#F1FFB6) on Col A only if Weightage >= 8 - NO green/red font coloring on CAGR cells (plain black only) - Category Average row Col B is EMPTY (no benchmark type) """ import csv import math import re from datetime import datetime from pathlib import Path from typing import List, Optional, Tuple, Dict, Any from openpyxl import Workbook from openpyxl.styles import PatternFill, Font, Alignment, Border, Side from openpyxl.utils import get_column_letter from openpyxl.formatting.rule import Rule, CellIsRule, FormulaRule from openpyxl.styles.differential import DifferentialStyle from src.models import Fund from src.weightage import compute_scores, drawdown_zero_fix from src.reference_data import extract_reference_data, get_fund_weightage_from_reference, DEFAULT_REFERENCE_PATH # ─── Color palette ───────────────────────────────────────────────────────────────── FILL_HEADER = PatternFill(start_color="C9FFCC", end_color="C9FFCC", fill_type="solid") FILL_BM_ROW = PatternFill(start_color="BAEAEE", end_color="BAEAEE", fill_type="solid") FILL_BM_CAGR = PatternFill(start_color="C4EFFF", end_color="C4EFFF", fill_type="solid") FILL_CAT_AVG = PatternFill(start_color="BAEAEE", end_color="BAEAEE", fill_type="solid") FILL_CAT_CAGR = PatternFill(start_color="C4EFFF", end_color="C4EFFF", fill_type="solid") FILL_WEIGHTED_YELLOW = PatternFill(start_color="FFFF00", end_color="FFFF00", fill_type="solid") FILL_WEIGHTED_GREEN = PatternFill(start_color="C6EFCE", end_color="C6EFCE", fill_type="solid") FILL_WHITE = PatternFill(fill_type=None) FILL_WEIGHT_REF = PatternFill(start_color="EDEDED", end_color="EDEDED", fill_type="solid") # light grey weight row # Quartile fills FILL_QUARTILE_GREEN = PatternFill(start_color="C6EFCE", end_color="C6EFCE", fill_type="solid") FILL_QUARTILE_YELLOW = PatternFill(start_color="FFFF00", end_color="FFFF00", fill_type="solid") FILL_QUARTILE_ORANGE = PatternFill(start_color="FFC000", end_color="FFC000", fill_type="solid") FILL_QUARTILE_RED = PatternFill(start_color="FFC7CE", end_color="FFC7CE", fill_type="solid") # ── Fonts — Arial for identical rendering on macOS + Windows ───────────────── # openpyxl falls back gracefully when Arial is absent, but both platforms ship it. FONT_DEFAULT = Font(name="Arial", size=8, color="000000") FONT_DEFAULT_BOLD = Font(name="Arial", size=8, bold=True, color="000000") FONT_HEADER = Font(name="Arial", size=8, bold=True, color="000000") FONT_CAT_HEADER = Font(name="Arial", size=10, bold=True, color="000000") FONT_WEIGHT_REF = Font(name="Arial", size=7, italic=True, color="666666") # subtle grey label THIN = Side(border_style="thin", color="CCCCCC") BORDER_THIN = Border(left=THIN, right=THIN, top=THIN, bottom=THIN) # ─── Weight reference row data (advisor-revised March 2026) ────────────────── # Shown beneath every category's column-header row as a read-only reference. # Must match src/weightage.py WEIGHTS exactly. # ↑ = Top-10 (higher better), ↓ = Bottom-10 (lower better) WEIGHT_REF_ROW: Dict[str, str] = { "ter": "0.15 ↓", "turnover": "0.10 ↓", "cagr_3y": "0.40 ↑", "cagr_5y": "0.60 ↑", "cagr_10y": "0.75 ↑", "pe_ratio": "0.15 ↓", "alpha": "1.00 ↑*", # * = Light Red if α < 1 "std_dev": "1.00 ↓", "sharpe": "1.20 ↑", "sortino": "1.30 ↑", "down_capture": "1.00 ↓", "max_drawdown": "1.35 ↑", "info_ratio": "1.00 ↑*", # * = Light Red if IR < 0 "weightage": "10.00", } # ─── Column definitions ─────────────────────────────────────────────────────── # Tuple: (header_label, fund_attr, col_width, is_pct, decimal_places) # Widths are calibrated so wrap_text = True keeps cells readable without # the advisor needing to manually drag columns on either platform. XLSX_COLUMNS = [ ("Fund", "name", 40, False, 0), # A — wide: long fund names ("Benchmark Type", "benchmark", 22, False, 0), # B ("TER", "ter", 9, True, 4), # C ("Turn over (%)", "turnover", 11, True, 2), # D ("Mean", "mean", 9, False, 2), # E ("1 Year CAGR", "cagr_1y", 10, False, 2), # F ("3 Years CAGR", "cagr_3y", 10, False, 2), # G ("5 Years CAGR", "cagr_5y", 10, False, 2), # H ("10 Years CAGR", "cagr_10y", 11, False, 2), # I ("CAGR Since Inception", "cagr_inception", 14, False, 2), # J ("NAV", "nav", 10, False, 2), # K ("P/E Ratio", "pe_ratio", 10, False, 2), # L ("P/B Ratio", "pb_ratio", 10, False, 2), # M ("Alpha", "alpha", 10, False, 2), # N ("Volatility", "volatility", 10, False, 2), # O ("Beta", "beta", 9, False, 2), # P ("Standard Deviation", "std_dev", 14, False, 2), # Q ("Sharpe Ratio", "sharpe", 11, False, 2), # R ("Sortino Ratio", "sortino", 11, False, 2), # S ("Up Market Capture", "up_capture", 14, False, 2), # T ("Down Market Capture", "down_capture", 16, False, 2), # U ("Maximum Drawdown", "max_drawdown", 15, False, 2), # V ("R-Squared", "r_squared", 11, False, 2), # W ("Information Ratio", "info_ratio", 14, False, 2), # X ("Total Assets (in Cr)", "aum", 16, False, 1), # Y ("Weightage", "weightage", 11, False, 3), # Z — 3dp for precision ] NUM_COLS = len(XLSX_COLUMNS) def _to_float(val) -> Optional[float]: """Safely convert raw CSV value to float.""" if val is None: return None s = str(val).strip().replace('%', '').replace(',', '') if s in ('', '-', 'N/A*', 'N/A', 'nan', 'None'): return None try: return float(s) except ValueError: return None def _parse_ter(val) -> Optional[float]: """Parse TER value - CSV has percentage format like '1.40%', convert to decimal.""" if val is None: return None # Check if percentage BEFORE stripping is_pct = '%' in str(val) s = str(val).strip().replace('%', '').replace(',', '') if s in ('', '-', 'N/A*', 'N/A', 'nan', 'None'): return None try: v = float(s) # Convert percentage to decimal (e.g., 1.40 -> 0.014) if is_pct: v = v / 100 return v except ValueError: return None def _parse_turnover(val) -> Optional[float]: """Parse turnover value - CSV has percentage format like '20%', convert to decimal.""" if val is None: return None # Check if percentage BEFORE stripping is_pct = '%' in str(val) s = str(val).strip().replace('%', '').replace(',', '') if s in ('', '-', 'N/A*', 'N/A', 'nan', 'None'): return None try: v = float(s) # Convert percentage to decimal (e.g., 20 -> 0.20) if is_pct: v = v / 100 return v except ValueError: return None def _parse_launch_date(val) -> Optional[datetime]: """Parse launch date from CSV into datetime.""" if val is None: return None s = str(val).strip() if not s or s in ("-", "N/A", "N/A*"): return None for fmt in ("%d-%m-%Y", "%Y-%m-%d", "%d/%m/%Y"): try: return datetime.strptime(s, fmt) except ValueError: continue return None # ─── Auto-calculation for incomplete sections ──────────────────────────────────── def _calculate_category_averages(funds: List[Fund]) -> Dict[str, Dict[str, Any]]: """ Calculate category averages from fund-level category CAGR values. For categories without official data, extract category average values from fund rows. Uses the FIRST fund's category average value for each period. """ categories: Dict[str, List[Fund]] = {} # Group funds by category for fund in funds: if fund.category not in categories: categories[fund.category] = [] categories[fund.category].append(fund) cat_avg_data: Dict[str, Dict[str, Any]] = {} for cat_name, cat_funds in categories.items(): if not cat_funds: continue # Use the FIRST fund's category average values # This matches the CSV structure where all funds should have the same category average first_fund = cat_funds[0] cat_avg_data[cat_name] = { 'cagr_1y': first_fund.cagr_1y_cat if first_fund.cagr_1y_cat and first_fund.cagr_1y_cat != 0 else None, 'cagr_3y': first_fund.cagr_3y_cat if first_fund.cagr_3y_cat and first_fund.cagr_3y_cat != 0 else None, 'cagr_5y': first_fund.cagr_5y_cat if first_fund.cagr_5y_cat and first_fund.cagr_5y_cat != 0 else None, 'cagr_10y': first_fund.cagr_10y_cat if first_fund.cagr_10y_cat and first_fund.cagr_10y_cat != 0 else None, 'pe_ratio': None, 'pb_ratio': None, 'is_calculated': True # Flag to indicate this is calculated from fund data } return cat_avg_data def _calculate_benchmark_index(funds: List[Fund]) -> Dict[str, Dict[str, Any]]: """ Calculate BM Index from fund-level benchmark CAGR values. For categories without a BM Index row in CSV, extract benchmark values from fund rows. Uses the FIRST fund's benchmark value for each period. """ categories: Dict[str, List[Fund]] = {} # Group funds by category for fund in funds: if fund.category not in categories: categories[fund.category] = [] categories[fund.category].append(fund) bm_data: Dict[str, Dict[str, Any]] = {} for cat_name, cat_funds in categories.items(): if not cat_funds: continue # Use the FIRST fund's benchmark values # This matches the CSV structure where we take the first fund's data first_fund = cat_funds[0] bm_data[cat_name] = { 'cagr_1y': first_fund.cagr_1y_bm if first_fund.cagr_1y_bm is not None else None, 'cagr_3y': first_fund.cagr_3y_bm if first_fund.cagr_3y_bm is not None else None, 'cagr_5y': first_fund.cagr_5y_bm if first_fund.cagr_5y_bm is not None else None, 'cagr_10y': first_fund.cagr_10y_bm if first_fund.cagr_10y_bm is not None else None, 'is_calculated': True # Flag to indicate this is calculated from fund data } return bm_data # ─── CSV Loader ─────────────────────────────────────────────────────────────────── def load_fund_csv(csv_path: str) -> Tuple[List[Fund], Dict[str, Dict[str, Any]], Dict[str, Dict[str, Any]], Dict[str, int]]: """ Parse the fund-stats CSV and merge with reference data from Processed_data.xlsx. For sections with missing reference data, auto-calculates category averages from fund data. Returns: (funds, bm_data, cat_avg_data, fund_weightages) """ csv_path = Path(csv_path) if not csv_path.exists(): raise FileNotFoundError(f"CSV not found: {csv_path}") # Load reference data from Processed_data.xlsx ref_bm_data, ref_cat_avg_data, ref_fund_weightages = extract_reference_data(DEFAULT_REFERENCE_PATH) funds: List[Fund] = [] current_category = "Unknown" bm_data: Dict[str, Dict[str, Any]] = {} cat_avg_data: Dict[str, Dict[str, Any]] = {} with open(csv_path, encoding='utf-8-sig', errors='replace') as f: reader = csv.reader(f) rows = list(reader) # DYNAMIC COLUMN DETECTION - Read header row first if not rows: raise ValueError("CSV file is empty") header = [str(col).strip() for col in rows[0]] col_map = {name: idx for idx, name in enumerate(header)} print(f"Detected CSV format with {len(header)} columns") # Detect format based on column names has_category_col = 'Category' in col_map has_scheme_code = 'Scheme Code' in col_map if has_category_col and has_scheme_code: print(" Format: NEW (36 columns with Category column)") else: print(" Format: OLD (35 columns without Category column)") pending_bm: Dict[str, Dict[str, Any]] = {} pending_cat_avg: Dict[str, Dict[str, Any]] = {} seen_fund_category: set[tuple[str, str]] = set() deduped_rows = 0 # Helper to get column index safely def get_col_idx(col_name: str) -> Optional[int]: return col_map.get(col_name) for row_idx, row in enumerate(rows): if row_idx == 0: # Skip header row continue if not row: continue col0 = str(row[0]).strip() # Category header - detect by checking if most columns are empty # Category headers are standalone rows with category name in col0 and empty data columns # This catches: "Equity: Large Cap", "Childrens Fund", "ETFs", "Retirement Fund", etc. # But NOT "BM Index" or "Category Average" rows if col0 not in ('BM Index', 'Category Average', '', 'nan'): # Check if this looks like a category header (columns 2-10 are empty) # For old format: check columns 2-10 (Benchmark Type is col 1, so skip it) # For new format: check columns 2-10 (Category is col 1, so skip it) check_cols = row[2:11] if len(row) > 10 else row[2:6] non_empty_count = sum(1 for cell in check_cols if str(cell).strip() not in ('', 'nan', 'None', '-')) if non_empty_count == 0 and len(col0) > 3: # All checked columns are empty - this is a category header current_category = col0 # Use reference data if available, otherwise use CSV data (which may be empty) if current_category in ref_bm_data: pending_bm[current_category] = ref_bm_data[current_category] else: pending_bm[current_category] = None if current_category in ref_cat_avg_data: pending_cat_avg[current_category] = ref_cat_avg_data[current_category] else: pending_cat_avg[current_category] = None continue # BM Index row - skip, we're using reference data if col0 == 'BM Index': continue # Category Average row - skip, we're using reference data if col0 == 'Category Average': continue # Skip header rows (repeated headers in CSV) if col0 == 'Fund' and len(row) > 1: # Check if this is a header row by looking at column 1 col1 = str(row[1]).strip() if len(row) > 1 else '' if col1 in ('Benchmark Type', 'Category'): continue if col0 in ('', 'nan'): continue # Parse fund using dynamic column mapping def g(col_name: str) -> Optional[float]: idx = get_col_idx(col_name) if idx is None: return None try: return _to_float(row[idx]) except (IndexError, TypeError): return None def get_str(col_name: str) -> str: idx = get_col_idx(col_name) if idx is None: return "" try: return str(row[idx]).strip() except (IndexError, TypeError): return "" # Get category - either from Category column or from current_category if has_category_col: fund_category = get_str('Category') or current_category else: fund_category = current_category # Get benchmark benchmark = get_str('Benchmark Type') # Get TER and Turnover with special parsing ter_idx = get_col_idx('TER') ter_val = _parse_ter(row[ter_idx]) if ter_idx is not None and len(row) > ter_idx else None turnover_idx = get_col_idx('Turn over (%)') turnover_val = _parse_turnover(row[turnover_idx]) if turnover_idx is not None and len(row) > turnover_idx else None dedupe_key = (col0.strip().lower(), fund_category.strip().lower()) if dedupe_key in seen_fund_category: deduped_rows += 1 continue seen_fund_category.add(dedupe_key) fund = Fund( name=col0, category=fund_category, benchmark=benchmark, ter=ter_val, turnover=turnover_val, mean=g('Mean'), cagr_1y=g('1 Year CAGR'), cagr_1y_cat=g('1 Year Category CAGR'), cagr_1y_bm=g('1 Year Benchmark CAGR'), cagr_3y=g('3 Years CAGR'), cagr_3y_cat=g('3 Years Category CAGR'), cagr_3y_bm=g('3 Years Benchmark CAGR'), cagr_5y=g('5 Years CAGR'), cagr_5y_cat=g('5 Years Category CAGR'), cagr_5y_bm=g('5 Years Benchmark CAGR'), cagr_10y=g('10 Years CAGR'), cagr_10y_cat=g('10 Years Category CAGR'), cagr_10y_bm=g('10 Years Benchmark CAGR'), cagr_inception=g('CAGR Since Inception'), nav=g('NAV'), pe_ratio=g('P/E Ratio'), pb_ratio=g('P/B Ratio'), alpha=g('Alpha'), beta=g('Beta'), std_dev=g('Standard Deviation'), sharpe=g('Sharpe Ratio'), volatility=g('Volatility'), sortino=g('Sortino Ratio'), up_capture=g('Up Market Capture\nRatio') or g('Up Market Capture'), down_capture=g('Down Market Capture\nRatio') or g('Down Market Capture'), max_drawdown=g('Maximum Drawdown'), r_squared=g('R-Squared'), info_ratio=g('Information Ratio'), aum=g('Total Assets (in Cr)'), fill_status=get_str('Fill Status') or None, ) # Preserve scheme code for downstream NAV / drawdown fixes scheme_code_str = get_str('Scheme Code') if scheme_code_str: setattr(fund, "_scheme_code", scheme_code_str) launch_dt = _parse_launch_date(get_str('Launch Date')) if launch_dt: setattr(fund, "_launch_date", launch_dt) fund.order = len(funds) # Preserve original CSV order for tiebreaker funds.append(fund) if deduped_rows: print(f" Deduplicated {deduped_rows} rows by (Fund, Category) at ingest") # Calculate category averages from fund data calculated_cat_avg = _calculate_category_averages(funds) # Calculate BM Index from fund-level benchmark data calculated_bm = _calculate_benchmark_index(funds) # Assign BM and Category Average data - ONLY use calculated data from CSV # DO NOT use reference data from Processed_data.xlsx for cat_name in set(f.category for f in funds): # BM Index: Always use calculated data from fund benchmark values bm_data[cat_name] = calculated_bm.get(cat_name, {}) # Category Average: Always use calculated data from fund category values cat_avg_data[cat_name] = calculated_cat_avg.get(cat_name, {}) return funds, bm_data, cat_avg_data, ref_fund_weightages def _fmt(val, decimals=2) -> Optional[float]: """Return rounded float or None.""" if val is None: return None try: return round(float(val), decimals) except (ValueError, TypeError): return None def _quartile_band_for_position(pos: int, total: int) -> Optional[int]: """ Return quartile band by positional rank (0-based) after sorting by score desc. Band mapping: - 0: Top quartile (Green) - 1: Upper-middle quartile (Yellow) - 2: Lower-middle quartile (Orange) - 3: Bottom quartile (Red) Uses rank-positioning (not score thresholds), so ties do not distort quartile sizes. """ if total <= 0 or pos < 0 or pos >= total: return None # Keep intuitive behavior for tiny categories. if total == 1: return 0 if total == 2: return 0 if pos == 0 else 3 if total == 3: if pos == 0: return 0 if pos == 1: return 1 return 3 q1_end = math.ceil(total * 0.25) q2_end = math.ceil(total * 0.50) q3_end = math.ceil(total * 0.75) if pos < q1_end: return 0 if pos < q2_end: return 1 if pos < q3_end: return 2 return 3 def _calculate_weightage(fund: Fund, cat_avg_vals: Dict[str, Any]) -> int: """ DEPRECATED: Legacy CAGR-based weightage calculation. Use compute_scores() from weightage.py for AI-suggested model. Calculate weightage based on period-weighted scoring against Category Average. Period weights: - 1 Year CAGR: 2 pts if fund beats Category Average - 3 Years CAGR: 3 pts if fund beats Category Average - 5 Years CAGR: 4 pts if fund beats Category Average - 10 Years CAGR: 5 pts if fund beats Category Average Max possible: 14 pts Note: Treat 0, N/A*, or - as "no data" (skip comparison) """ weightage = 0 # Period weights mapping period_weights = { 'cagr_1y': 2, 'cagr_3y': 3, 'cagr_5y': 4, 'cagr_10y': 5, } for attr, weight in period_weights.items(): fund_val = getattr(fund, attr, None) cat_avg_val = cat_avg_vals.get(attr) if cat_avg_vals else None # Skip if fund value is 0, None, or invalid if fund_val is None or fund_val == 0: continue if cat_avg_val is None or cat_avg_val == 0: continue # Award points if fund beats category average if fund_val > cat_avg_val: weightage += weight return weightage def _calculate_green_cell_weightage(fund: Fund, all_funds_in_category: List[Fund]) -> int: """ Calculate weightage as the count of GREEN cells (top 10 rankings). Matches Excel conditional formatting rules: - Only metrics with GREEN highlighting are counted - Bottom 10 metrics get RED highlighting (not counted) GREEN metrics (Top 10 = Green): - CAGR columns: F, G, H, I (1Y, 3Y, 5Y, 10Y) - Top 10 columns: J, N, R, S, T, X, Y (Inception, Alpha, Sharpe, Sortino, UpCapture, InfoRatio, Assets) Total possible: 11 green cells """ green_count = 0 # Only metrics that get GREEN highlighting in Excel (Top 10 = Green) green_metrics = [ 'cagr_1y', # Column F 'cagr_3y', # Column G 'cagr_5y', # Column H 'cagr_10y', # Column I 'cagr_inception', # Column J 'alpha', # Column N 'sharpe', # Column R 'sortino', # Column S 'up_capture', # Column T 'info_ratio', # Column X 'aum' # Column Y (Assets) ] # Check each metric that gets GREEN highlighting for metric in green_metrics: if _is_in_top_10(fund, all_funds_in_category, metric, higher_is_better=True): green_count += 1 return green_count def _is_in_top_10(fund: Fund, all_funds: List[Fund], metric: str, higher_is_better: bool) -> bool: """ Check if a fund is in top 10 for a given metric within its category. Args: fund: The fund to check all_funds: All funds in the same category metric: The metric attribute name (e.g., 'cagr_1y', 'ter') higher_is_better: True if higher values are better, False if lower is better Returns: True if fund is in top 10, False otherwise """ fund_val = getattr(fund, metric, None) # Skip if fund doesn't have this metric if fund_val is None or fund_val == 0: return False # Collect all valid values for this metric in the category valid_values = [] for f in all_funds: val = getattr(f, metric, None) if val is not None and val != 0: valid_values.append(val) # Need at least 10 funds with data to have a top 10 if len(valid_values) < 10: # If fewer than 10 funds, check if fund is in top half if len(valid_values) < 2: return False valid_values.sort(reverse=higher_is_better) threshold_idx = len(valid_values) // 2 threshold = valid_values[threshold_idx] if higher_is_better: return fund_val >= threshold else: return fund_val <= threshold # Sort values to find top 10 threshold valid_values.sort(reverse=higher_is_better) # Count how many funds are strictly better than this fund if higher_is_better: better_count = sum(1 for v in valid_values if v > fund_val) else: better_count = sum(1 for v in valid_values if v < fund_val) # Fund is in top 10 if 9 or fewer funds are strictly better (ranks 1-10) return better_count <= 9 def _get_cagr_font_color() -> Font: """ NO font coloring - always return default black font. Per instructions: "CRITICAL: NO green/red font coloring anywhere" """ return FONT_DEFAULT def _apply_conditional_formatting(ws, start_row: int, end_row: int, cat_avg_vals: Dict[str, Any]): """ Apply conditional formatting rules per MF_Scoring_Model.md Light Green (C6EFCE) + Dark Green Text (006100) for: - Top 10: CAGR (all periods), Alpha, Sharpe, Sortino, Up Capture, R-Squared, Info Ratio, Total Assets, CAGR Since Inception - Bottom 10: TER, Turnover, Beta, Std Dev, Down Capture, P/E, P/B, Max Drawdown Light Red (FFC7CE) for threshold violations: - Alpha < 1 - Info Ratio < 0 - CAGR < Category Average (all periods) """ if start_row >= end_row: return # Define colors for conditional formatting green_fill = PatternFill(start_color="C6EFCE", end_color="C6EFCE", fill_type="solid") green_font = Font(color="006100") red_fill = PatternFill(start_color="FFC7CE", end_color="FFC7CE", fill_type="solid") red_font = Font(color="9C0006") # ═══════════════════════════════════════════════════════════════════════════ # DUAL-CONDITION COLUMNS (Green for Top 10, Red for threshold violations) # ═══════════════════════════════════════════════════════════════════════════ # CAGR columns: Green for Top 10, Red if < Category Average cagr_cols = { 'F': (6, cat_avg_vals.get('cagr_1y')), # 1 Year CAGR 'G': (7, cat_avg_vals.get('cagr_3y')), # 3 Years CAGR 'H': (8, cat_avg_vals.get('cagr_5y')), # 5 Years CAGR 'I': (9, cat_avg_vals.get('cagr_10y')), # 10 Years CAGR } for col_letter, (col_num, cat_avg) in cagr_cols.items(): range_str = f"{col_letter}{start_row}:{col_letter}{end_row}" # Rule 1: Red if < Category Average (higher priority) if cat_avg is not None: rule_red = CellIsRule( operator='lessThan', formula=[str(cat_avg)], stopIfTrue=True, # Stop if red applies fill=red_fill, font=red_font ) ws.conditional_formatting.add(range_str, rule_red) # Rule 2: Green for Top 10 rule_green = Rule( type='top10', rank=10, stopIfTrue=False ) rule_green.dxf = DifferentialStyle(fill=green_fill, font=green_font) ws.conditional_formatting.add(range_str, rule_green) # Alpha (Col N = 14): Green for Top 10, Red if < 1 range_str = f"N{start_row}:N{end_row}" rule_red = CellIsRule( operator='lessThan', formula=['1'], stopIfTrue=True, fill=red_fill, font=red_font ) ws.conditional_formatting.add(range_str, rule_red) rule_green = Rule(type='top10', rank=10, stopIfTrue=False) rule_green.dxf = DifferentialStyle(fill=green_fill, font=green_font) ws.conditional_formatting.add(range_str, rule_green) # Information Ratio (Col X = 24): Green for Top 10, Red if < 0 range_str = f"X{start_row}:X{end_row}" rule_red = CellIsRule( operator='lessThan', formula=['0'], stopIfTrue=True, fill=red_fill, font=red_font ) ws.conditional_formatting.add(range_str, rule_red) rule_green = Rule(type='top10', rank=10, stopIfTrue=False) rule_green.dxf = DifferentialStyle(fill=green_fill, font=green_font) ws.conditional_formatting.add(range_str, rule_green) # ═══════════════════════════════════════════════════════════════════════════ # TOP 10 COLUMNS (Green - Higher is Better) # ═══════════════════════════════════════════════════════════════════════════ top10_cols = { 'J': 'CAGR Since Inception', 'R': 'Sharpe Ratio', 'S': 'Sortino Ratio', 'T': 'Up Market Capture', 'W': 'R-Squared', 'Y': 'Total Assets' } for col_letter, name in top10_cols.items(): range_str = f"{col_letter}{start_row}:{col_letter}{end_row}" rule = Rule(type='top10', rank=10, stopIfTrue=False) rule.dxf = DifferentialStyle(fill=green_fill, font=green_font) ws.conditional_formatting.add(range_str, rule) # Maximum Drawdown (Col V): Top 10 among NON-ZERO values only. # This keeps zeros as "no data" and avoids green highlighting for zero entries. v_range = f"V{start_row}:V{end_row}" # Guard against text placeholders like "NA": Excel treats "NA" <> 0 as TRUE, # which can incorrectly qualify the cell for highlighting. Only numeric values participate. v_formula = ( f'AND(' f'ISNUMBER(V{start_row}),' f'V{start_row}<>0,' f'COUNTIFS($V${start_row}:$V${end_row},\">\"&V{start_row},$V${start_row}:$V${end_row},\"<>0\")<10' f')' ) v_rule = FormulaRule(formula=[v_formula], stopIfTrue=False, fill=green_fill, font=green_font) ws.conditional_formatting.add(v_range, v_rule) # ═══════════════════════════════════════════════════════════════════════════ # BOTTOM 10 COLUMNS (Green - Lower is Better) # ═══════════════════════════════════════════════════════════════════════════ bottom10_cols = { 'C': 'TER', 'D': 'Turnover', 'L': 'P/E Ratio', 'P': 'Beta', 'Q': 'Standard Deviation', 'U': 'Down Market Capture' } for col_letter, name in bottom10_cols.items(): range_str = f"{col_letter}{start_row}:{col_letter}{end_row}" rule = Rule( type='top10', rank=10, bottom=True, # Bottom 10 = lowest values stopIfTrue=False ) rule.dxf = DifferentialStyle(fill=green_fill, font=green_font) ws.conditional_formatting.add(range_str, rule) def export_excel(funds: List[Fund], output_path: str, bm_data: Dict[str, Dict[str, Any]] = None, cat_avg_data: Dict[str, Dict[str, Any]] = None) -> str: """Build the processed Excel matching target format exactly.""" output_path = Path(output_path) output_path.parent.mkdir(parents=True, exist_ok=True) if bm_data is None: bm_data = {} if cat_avg_data is None: cat_avg_data = {} wb = Workbook() ws = wb.active ws.title = "Sheet2" na_audit_rows: List[str] = [] # Apply NA policy to all numeric export columns. # Exclusions are text/derived columns that should stay as-is. na_on_zero_attrs = { attr for _, attr, _, _, _ in XLSX_COLUMNS if attr and attr not in {"name", "benchmark", "weightage"} } cagr_period_by_attr = { "cagr_1y": 1, "cagr_3y": 3, "cagr_5y": 5, "cagr_10y": 10, } def _years_since_launch(fund_obj: Fund) -> Optional[float]: launch_dt = getattr(fund_obj, "_launch_date", None) if not isinstance(launch_dt, datetime): return None return max(0.0, (datetime.now() - launch_dt).days / 365.25) def _audit_na(row_type: str, category: str, fund_name: str, attr: str, reason: str) -> None: na_audit_rows.append( f"{row_type}\t{category}\t{fund_name}\t{attr}\t{reason}" ) def _display_numeric_or_na( *, attr: str, value: Any, row_type: str, category: str, fund_obj: Optional[Fund] = None, fund_name: str = "", decimals: int = 2, ) -> Any: """ Convert numeric value to rounded float or 'NA' for missing/invalid values. Also appends NA decisions to audit rows. Category Average: PE and PB show blank (not NA) when missing. """ # Category Average row: PE and PB stay blank when missing if row_type == "CATEGORY_AVG" and attr in ("pe_ratio", "pb_ratio"): if value is None: return None try: num = float(value) return round(num, decimals) if num != 0 else None except (TypeError, ValueError): return None if attr in na_on_zero_attrs: if value is None: _audit_na(row_type, category, fund_name, attr, "missing value") return "NA" try: num = float(value) except (TypeError, ValueError): _audit_na(row_type, category, fund_name, attr, "non-numeric value") return "NA" if num == 0: # Duration-aware reason for CAGR periods when launch date exists. if fund_obj is not None and attr in cagr_period_by_attr: years = _years_since_launch(fund_obj) period = cagr_period_by_attr[attr] if years is not None and years < period: _audit_na( row_type, category, fund_name, attr, f"fund age {years:.2f}y < required {period}y", ) else: _audit_na(row_type, category, fund_name, attr, "source value is 0") else: _audit_na(row_type, category, fund_name, attr, "source value is 0") return "NA" return round(num, decimals) # Non-NA-managed attributes use existing behavior. if value is None: return None try: return round(float(value), decimals) except (TypeError, ValueError): return value # ── Row 1: Column headers (include weight hints for scored metrics) ───── ws.row_dimensions[1].height = 36 for col_idx, (header, attr, width, _, _) in enumerate(XLSX_COLUMNS, start=1): # If this column participates in the scoring model, append its weight # so the advisor can see weights even when scrolled deep into a category. weight_hint = WEIGHT_REF_ROW.get(attr) if weight_hint: header_value = f"{header}\n({weight_hint})" else: header_value = header cell = ws.cell(row=1, column=col_idx, value=header_value) cell.fill = FILL_HEADER cell.font = FONT_HEADER cell.border = BORDER_THIN cell.alignment = Alignment(horizontal="center", vertical="center", wrap_text=True) ws.column_dimensions[get_column_letter(col_idx)].width = width # Freeze col A + row 1 so fund names and headers stay visible while scrolling ws.freeze_panes = "B2" # ── Group funds by category ──────────────────────────────────────────────── categories: Dict[str, List[Fund]] = {} category_order = [] for fund in funds: if fund.category not in categories: category_order.append(fund.category) categories.setdefault(fund.category, []).append(fund) current_row = 2 for idx, cat_name in enumerate(category_order): cat_funds = categories[cat_name] # Sort by score (displayed value) descending so Weightage column is strictly largest-to-lowest sorted_funds = sorted( cat_funds, key=lambda f: (-(f.score or 0), (f.name or "").lower(), getattr(f, 'order', 0)), ) # Quartiles by positional rank, not by score thresholds. # This guarantees consistent quartile sizing even when many funds share the same score. quartile_by_fund_id: Dict[int, int] = {} for pos, fund in enumerate(sorted_funds): band = _quartile_band_for_position(pos, len(sorted_funds)) if band is not None: quartile_by_fund_id[id(fund)] = band # ── Header row (repeat before each category except first) ───────────── if idx > 0: ws.row_dimensions[current_row].height = 32 for col_idx, (header, _, _, _, _) in enumerate(XLSX_COLUMNS, start=1): cell = ws.cell(row=current_row, column=col_idx, value=header) cell.fill = FILL_HEADER cell.font = FONT_HEADER cell.border = BORDER_THIN cell.alignment = Alignment(horizontal="center", vertical="center", wrap_text=True) current_row += 1 # ── Category header row ─────────────────────────────────────────────── ws.row_dimensions[current_row].height = 20 for col_idx in range(1, NUM_COLS + 1): cell = ws.cell(row=current_row, column=col_idx) cell.fill = FILL_WHITE cell.border = BORDER_THIN cat_cell = ws.cell(row=current_row, column=1, value=cat_name) cat_cell.font = FONT_CAT_HEADER cat_cell.alignment = Alignment(horizontal="left", vertical="center", wrap_text=True) ws.merge_cells(start_row=current_row, start_column=1, end_row=current_row, end_column=NUM_COLS - 1) current_row += 1 # ── BM Index row ─────────────────────────────────────────────────────── bm_vals = bm_data.get(cat_name, {}) ws.row_dimensions[current_row].height = 14 for col_idx, (header, attr, _, _, _) in enumerate(XLSX_COLUMNS, start=1): val = None if col_idx == 1: val = "BM Index" elif attr in bm_vals: val = _display_numeric_or_na( attr=attr, value=bm_vals[attr], row_type="BM_INDEX", category=cat_name, fund_name="BM Index", decimals=2, ) cell = ws.cell(row=current_row, column=col_idx, value=val) if col_idx == 1: cell.fill = FILL_BM_ROW elif col_idx in [6, 7, 8, 9]: cell.fill = FILL_BM_CAGR else: cell.fill = FILL_WHITE cell.font = FONT_DEFAULT_BOLD cell.border = BORDER_THIN cell.alignment = Alignment( horizontal="right" if col_idx > 2 else "left", vertical="center", wrap_text=(col_idx == 1) ) current_row += 1 # ── Category Average row ────────────────────────────────────────────── cat_avg_vals = cat_avg_data.get(cat_name, {}) ws.row_dimensions[current_row].height = 14 for col_idx, (header, attr, _, _, _) in enumerate(XLSX_COLUMNS, start=1): val = None if col_idx == 1: val = "Category Average" elif attr in cat_avg_vals: val = _display_numeric_or_na( attr=attr, value=cat_avg_vals[attr], row_type="CATEGORY_AVG", category=cat_name, fund_name="Category Average", decimals=2, ) cell = ws.cell(row=current_row, column=col_idx, value=val) if col_idx == 1: cell.fill = FILL_CAT_AVG elif col_idx in [6, 7, 8, 9, 12, 13]: cell.fill = FILL_CAT_CAGR else: cell.fill = FILL_WHITE cell.font = FONT_DEFAULT_BOLD cell.border = BORDER_THIN cell.alignment = Alignment( horizontal="right" if col_idx > 2 else "left", vertical="center", wrap_text=(col_idx == 1) ) current_row += 1 # ── Fund rows ───────────────────────────────────────────────────────── fund_start_row = current_row top_5_fund_ids = {id(f) for f in sorted_funds[:5]} for fund in sorted_funds: # 36pt height = comfortable 2-line display for long fund names # without the advisor needing to drag rows on macOS or Windows ws.row_dimensions[current_row].height = 36 weightage = fund.score or 0 score_val = round(weightage, 3) is_top_5 = id(fund) in top_5_fund_ids for col_idx, (header, attr, _, _, decimals) in enumerate(XLSX_COLUMNS, start=1): if attr == "weightage": val = score_val cell_font = FONT_DEFAULT_BOLD if is_top_5 else FONT_DEFAULT elif attr: raw_val = getattr(fund, attr, None) if attr in ('name', 'benchmark'): val = raw_val if raw_val else None cell_font = FONT_DEFAULT_BOLD if (col_idx == 1 and is_top_5) else FONT_DEFAULT else: val = _display_numeric_or_na( attr=attr, value=raw_val, row_type="FUND", category=fund.category, fund_obj=fund, fund_name=fund.name, decimals=decimals, ) cell_font = FONT_DEFAULT else: val = None cell_font = FONT_DEFAULT cell = ws.cell(row=current_row, column=col_idx, value=val) if is_top_5 and col_idx == 1: cell.fill = FILL_WEIGHTED_YELLOW elif attr == "weightage": quartile_band = quartile_by_fund_id.get(id(fund)) if quartile_band == 0: cell.fill = FILL_QUARTILE_GREEN elif quartile_band == 1: cell.fill = FILL_QUARTILE_YELLOW elif quartile_band == 2: cell.fill = FILL_QUARTILE_ORANGE elif quartile_band == 3: cell.fill = FILL_QUARTILE_RED else: cell.fill = FILL_WHITE else: cell.fill = FILL_WHITE cell.font = cell_font cell.border = BORDER_THIN cell.alignment = Alignment( horizontal="left" if col_idx <= 2 else "right", vertical="top", # top-align so wrapped text reads naturally wrap_text=True, # prevents truncation on any screen or zoom level ) if col_idx == 3: cell.number_format = '0.00%' elif col_idx == 4: cell.number_format = '0.00%' elif attr == "weightage": cell.number_format = '0.000' current_row += 1 # Apply conditional formatting to this section's fund rows fund_end_row = current_row - 1 if fund_end_row >= fund_start_row and cat_avg_vals: _apply_conditional_formatting(ws, fund_start_row, fund_end_row, cat_avg_vals) wb.save(str(output_path)) if na_audit_rows: audit_path = output_path.with_name(f"{output_path.stem}_na_audit.txt") lines = [ "NA AUDIT TRACE", f"Generated: {datetime.now().isoformat()}", "Columns: row_typecategoryfund_namemetric_attrreason", "-" * 80, *na_audit_rows, ] audit_path.write_text("\n".join(lines), encoding="utf-8") print(f"NA audit trace written: {audit_path}") return str(output_path) def _avg(values: List[Optional[float]]) -> Optional[float]: """Compute average of non-None values.""" valid = [v for v in values if v is not None] if not valid: return None return round(sum(valid) / len(valid), 2) # ─── Pipeline entry ──────────────────────────────────────────────────────────────── def run_data_engine(csv_path: str, output_path: str = "output/fund_analysis.xlsx", use_comprehensive_scoring: bool = True) -> List[Fund]: """ Full pipeline: load -> score -> export Excel. Args: csv_path: Path to the fund-stats CSV file output_path: Path to save the output Excel file use_comprehensive_scoring: If True, uses AI-suggested model (10-point scale with Top/Bottom 10). If False, uses legacy CAGR-based weightage. """ print(f"Loading fund data from: {csv_path}") funds, bm_data, cat_avg_data, ref_fund_weightages = load_fund_csv(csv_path) print(f" Loaded {len(funds)} fund schemes") # Proactively fix zero / missing drawdown cells using live NAV history # so Maximum Drawdown can participate in scoring instead of staying at 0. try: fixed_mdd = drawdown_zero_fix(funds, verbose=True) if fixed_mdd: print(f" Fixed {fixed_mdd} zero/missing drawdown cells via NAV engine") except Exception as exc: print(f" WARNING: drawdown_zero_fix failed: {exc}") if use_comprehensive_scoring: # Use AI-suggested model (10-point scale) print(" Using AI-suggested scoring model (10-point scale with Top/Bottom 10)...") # Import and use the new compute_scores function funds = compute_scores(funds) # Copy score to weightage field for Excel export compatibility for fund in funds: fund.weightage = int(round(fund.score)) if fund.score else 0 with_highlight = sum(1 for f in funds if (f.score or 0) > 8) print(f" Calculated AI-suggested weightage. {with_highlight} funds have score > 8") else: # Use legacy CAGR-based weightage print(" Using legacy CAGR-based weightage...") for fund in funds: cat_avg_vals = cat_avg_data.get(fund.category, {}) fund.weightage = _calculate_weightage(fund, cat_avg_vals) fund.score = float(fund.weightage) with_highlight = sum(1 for f in funds if (f.weightage or 0) > 8) print(f" Calculated weightage. {with_highlight} funds have weightage > 8") print(f"Exporting processed Excel to: {output_path}") path = export_excel(funds, output_path, bm_data, cat_avg_data) print(f"Done! Saved: {path}") return funds