"""Normalization layer — converts raw MySQL handbook content into typed render blocks. Each section_json from the database is parsed into a list of RenderBlock objects. Every block has a `block_type` that maps 1-to-1 to a Jinja partial and a CSS class. This prevents ad-hoc interpretation of raw JSON throughout the rendering pipeline. Block types (from theme.BLOCK_TYPES): heading_1, heading_2, paragraph, bullet_list, note, table, enrollment_steps, school_profile, university_summary, toc, cover, full_page_image """ from __future__ import annotations import re from dataclasses import dataclass, field from typing import Any from app.services.utils import ( emphasize_keywords, format_money_figures, get_any, h, hb_slug, is_assoc, is_truthy, ) from app.services.renderers import fetch_image_data_uri # ─────────────────────────────────────────────────────────────── # Block data-classes # ─────────────────────────────────────────────────────────────── @dataclass class RenderBlock: """Base typed render block.""" block_type: str css_class: str = "" data: dict[str, Any] = field(default_factory=dict) # ─────────────────────────────────────────────────────────────── # Section → blocks # ─────────────────────────────────────────────────────────────── def normalize_section( section_key: str, section_title: str, section_json: dict | list, *, universities: list[dict] | None = None, debug: bool = False, ) -> list[RenderBlock]: """Convert a single global section payload into a list of RenderBlocks. This is the single translation point between the database schema and the rendering layer. """ blocks: list[RenderBlock] = [] key_norm = section_key.lower().strip() if not isinstance(section_json, dict): section_json = {} layout_norm = str(section_json.get("layout", "")).lower().strip() # ── Summary of universities ── if key_norm == "summary_of_universities": blocks.extend(_normalize_university_summary( section_title, section_json, layout_norm, universities or [], )) return blocks # ── Section heading ── title = section_title.strip() if title and key_norm != "table_of_contents": blocks.append(RenderBlock( block_type="heading_1", css_class="hb-heading-1", data={"text": title}, )) # ── Steps → enrollment_steps ── steps = section_json.get("steps") if isinstance(steps, list): blocks.append(RenderBlock( block_type="enrollment_steps", css_class="hb-enrollment-steps", data={"steps": _normalize_steps(steps)}, )) return blocks # ── Bullets ── has_bullets = isinstance(section_json.get("bullets"), list) has_items = isinstance(section_json.get("items"), list) if has_bullets or (layout_norm == "bullets_with_note" and has_items): from markupsafe import Markup lst = section_json.get("items") if has_items else section_json.get("bullets") items = [format_money_figures(str(b).strip()) for b in lst if str(b).strip()] html_items = [Markup(emphasize_keywords(it)) for it in items] blocks.append(RenderBlock( block_type="bullet_list", css_class="hb-bullet-list", data={"entries": html_items, "html_entries": True}, )) note = format_money_figures( str(section_json.get("note", section_json.get("footnote", ""))).strip() ) if note: blocks.append(RenderBlock( block_type="note", css_class="hb-note", data={"text": note}, )) return blocks # ── Basic table ── cols = section_json.get("columns") rows = section_json.get("rows") if isinstance(cols, list) and isinstance(rows, list): blocks.append(_normalize_basic_table(cols, rows)) return blocks # ── table_v2 ── if layout_norm == "table_v2": blocks.append(_normalize_table_v2(section_json)) return blocks # ── doc_v1 ── if layout_norm == "doc_v1" and isinstance(section_json.get("blocks"), list): blocks.extend(_normalize_doc_v1(section_json["blocks"])) return blocks # ── Fallback ── if "text" in section_json: text = format_money_figures(str(section_json["text"])) if text.strip(): from markupsafe import Markup blocks.append(RenderBlock( block_type="paragraph", css_class="hb-paragraph", data={ "text": text, "html": Markup(emphasize_keywords(text)), }, )) return blocks # ─────────────────────────────────────────────────────────────── # University profile normalisation # ─────────────────────────────────────────────────────────────── def normalize_university( uni_raw: dict[str, Any], allow_remote: bool, include_inactive_programs: bool, debug: bool, stats: dict[str, Any], ) -> RenderBlock: """Convert raw university data into a school_profile RenderBlock.""" uni_name = uni_raw["name"] sections = uni_raw.get("sections", []) is_first = uni_raw.get("_is_first", False) stats["universities"] = stats.get("universities", 0) + 1 # Build section map; merge duplicate "programs" sections sec_map: dict[str, dict] = {} for s in sections: if not isinstance(s, dict): continue k = str(s.get("section_key", "")) if not k: continue if k == "programs" and k in sec_map: existing = sec_map["programs"].get("section_json", {}) incoming = s.get("section_json", {}) if not isinstance(existing, dict): existing = {} if not isinstance(incoming, dict): incoming = {} a = existing.get("programs", []) b = incoming.get("programs", []) if not isinstance(a, list): a = [] if not isinstance(b, list): b = [] existing["programs"] = a + b sec_map["programs"]["section_json"] = existing continue sec_map[k] = s # Campus image img_section = sec_map.get("campus_image") or sec_map.get("image") campus_image = "" campus_caption = "" if img_section: j = img_section.get("section_json", {}) if isinstance(j, dict): campus_url = str(j.get("image_url", "")).strip() campus_caption = str(j.get("caption", "")).strip() if allow_remote and campus_url: embedded = fetch_image_data_uri(campus_url) if embedded: campus_image = embedded stats["images_embedded"] = stats.get("images_embedded", 0) + 1 else: stats["images_placeholder"] = stats.get("images_placeholder", 0) + 1 else: stats["images_placeholder"] = stats.get("images_placeholder", 0) + 1 # Overview and website resolved_website = (uni_raw.get("website") or "").strip() overview_data = None if "overview" in sec_map: overview_json = sec_map["overview"].get("section_json", {}) if not isinstance(overview_json, dict): overview_json = {} site_from_overview = get_any( overview_json, ["university_website", "university_website_url", "website", "site", "url", "homepage", "web_url"], ) if not resolved_website and site_from_overview: resolved_website = site_from_overview overview_data = { "founded": get_any(overview_json, ["founded", "Founded"]), "total_students": get_any(overview_json, ["total_students", "Total Students"]), "undergraduates": get_any(overview_json, [ "undergraduates", "Undergraduate Students", "undergraduate_students", ]), "postgraduates": get_any(overview_json, [ "postgraduate_students", "Postgraduate Students", ]), "acceptance_rate": get_any(overview_json, ["acceptance_rate", "Acceptance Rate"]), "location": get_any(overview_json, ["location", "Location"]), "tuition": get_any(overview_json, [ "tuition_out_of_state_yearly", "Yearly Out of State Tuition Fees", "Yearly Out-of-State Tuition Fees", "Yearly Tuition Fees", "Yearly Out-of-State Tuition Fees:", ]), } if resolved_website: stats["university_links"] = stats.get("university_links", 0) + 1 stats["website_rows"] = stats.get("website_rows", 0) + 1 # Benefits benefits = None if "benefits" in sec_map: j = sec_map["benefits"].get("section_json", {}) if not isinstance(j, dict): j = {} raw_benefits = j.get("benefits", []) if isinstance(raw_benefits, list): benefits = [str(b).strip() for b in raw_benefits if str(b).strip()] else: benefits = [] # Programs programs = None if "programs" in sec_map: j = sec_map["programs"].get("section_json", {}) if not isinstance(j, dict): j = {} programs_raw = j.get("programs", []) if not isinstance(programs_raw, list): programs_raw = [] if not include_inactive_programs: programs_raw = [ p for p in programs_raw if isinstance(p, dict) and is_truthy( p.get("program_active", p.get("is_active", p.get("active", 1))) ) ] programs = [] seen_names = set() for p in programs_raw: if not isinstance(p, dict): continue program_name = str(p.get("program_name", "")).strip() # Deduplicate by lowercase program name key = program_name.lower() if key in seen_names: continue seen_names.add(key) link = str(p.get("program_link", "")).strip() if not link and isinstance(p.get("program_links"), dict): link = str(p["program_links"].get("web_link", "")).strip() career = p.get("career_pathways", []) career_items: list[str] = [] if isinstance(career, list): career_items = [str(x).strip() for x in career if str(x).strip()] else: raw = str(career).strip() if raw: career_items = [l.strip() for l in re.split(r"[\r\n]+", raw) if l.strip()] programs.append({ "name": program_name, "link": link, "designation": str(p.get("designation", "")), "entrance": str(p.get("entrance_exam", p.get("entrance_examination", ""))), "career_items": career_items, "funding": str(p.get("funding_category", "")), }) # Extra sections (rendered via global blocks normalizer) skip_keys = {"campus_image", "image", "overview", "benefits", "programs"} extra_blocks: list[list[RenderBlock]] = [] for s in sections: if not isinstance(s, dict): continue k = str(s.get("section_key", "")) if not k or k in skip_keys: continue title = str(s.get("section_title", "")) j = s.get("section_json", {}) if not isinstance(j, dict): j = {} extra_blocks.append(normalize_section(k, title, j, debug=debug)) classes = ["hb-school-profile", "page-break"] return RenderBlock( block_type="school_profile", css_class=" ".join(classes), data={ "name": uni_name, "anchor": uni_raw.get("anchor"), "sort_order": uni_raw.get("sort_order"), "website": resolved_website, "overview": overview_data, "campus_image": campus_image, "campus_caption": campus_caption, "benefits": benefits, "programs": programs, "extra_blocks": extra_blocks, }, ) # ─────────────────────────────────────────────────────────────── # Internal helpers # ─────────────────────────────────────────────────────────────── def _normalize_steps(steps: list) -> list[dict]: """Normalise enrollment steps into structured dicts.""" result = [] step_num = 0 for s in steps: if not isinstance(s, dict): continue step_num += 1 step_title = str(s.get("title", s.get("step_title", ""))).strip() body = format_money_figures(str(s.get("body", s.get("description", ""))).strip()) # Pre-format body with bold emphasis on REGULAR, PRIME, $ amounts from markupsafe import Markup body_html = Markup(emphasize_keywords(body)) if body else "" links = [] raw_links = s.get("links", []) if isinstance(raw_links, list): for lnk in raw_links: if not isinstance(lnk, dict): continue label = str(lnk.get("label", "Link")).strip() url = str(lnk.get("url", "")).strip() if url: links.append({"label": label, "url": url}) qr = str(s.get("qr_url", s.get("qr_image", ""))).strip() result.append({ "number": step_num, "title": step_title, "body": body, "body_html": body_html, "links": links, "qr_url": qr, }) return result def _normalize_basic_table(cols: list, rows: list) -> RenderBlock: """Normalise a basic table (columns + rows).""" norm_rows = [] for r in rows: if not isinstance(r, (list, dict)): continue if isinstance(r, dict): row = [] for col_label in cols: key_guess = re.sub(r"[^a-z0-9]+", "_", str(col_label).lower()) cell = r.get(key_guess, "") row.append(format_money_figures(str(cell))) norm_rows.append(row) else: norm_rows.append([format_money_figures(str(cell)) for cell in r]) return RenderBlock( block_type="table", css_class="hb-table", data={ "columns": [str(c) for c in cols], "rows": norm_rows, "variant": "standard", }, ) def _normalize_table_v2(json_data: dict) -> RenderBlock: """Normalise table_v2 (comparison table with header groups).""" base_cols = json_data.get("base_columns", []) groups = json_data.get("header_groups", []) rows = json_data.get("rows", []) if not isinstance(base_cols, list): base_cols = [] if not isinstance(groups, list): groups = [] if not isinstance(rows, list): rows = [] all_cols: list[dict] = [] for c in base_cols: if isinstance(c, dict): all_cols.append({"key": str(c.get("key", "")), "label": str(c.get("label", ""))}) for g in groups: if not isinstance(g, dict): continue g_cols = g.get("columns", []) if not isinstance(g_cols, list): g_cols = [] for c in g_cols: if isinstance(c, dict): all_cols.append({"key": str(c.get("key", "")), "label": str(c.get("label", ""))}) norm_rows = [] for r in rows: if not isinstance(r, dict): continue row = {} for c in all_cols: k = c.get("key", "") val = r.get(k, "") if isinstance(val, dict): val = val.get("text", "") row[k] = format_money_figures(str(val)) norm_rows.append(row) return RenderBlock( block_type="table", css_class="hb-table hb-table-comparison", data={ "base_columns": [{"key": c.get("key", ""), "label": c.get("label", "")} for c in base_cols if isinstance(c, dict)], "header_groups": [ { "label": str(g.get("label", "")), "columns": [{"key": str(c.get("key", "")), "label": str(c.get("label", ""))} for c in (g.get("columns", []) if isinstance(g.get("columns"), list) else []) if isinstance(c, dict)], } for g in groups if isinstance(g, dict) ], "all_columns": all_cols, "rows": norm_rows, "variant": "comparison", }, ) def _normalize_doc_v1(blocks: list) -> list[RenderBlock]: """Normalise doc_v1 blocks into typed RenderBlocks.""" from markupsafe import Markup result: list[RenderBlock] = [] for b in blocks: if not isinstance(b, dict): continue btype = str(b.get("type", "")) if btype == "paragraph": t = format_money_figures(str(b.get("text", ""))) if t.strip(): result.append(RenderBlock( block_type="paragraph", css_class="hb-paragraph", data={ "text": t, "html": Markup(emphasize_keywords(t)), }, )) elif btype == "subheading": t = format_money_figures(str(b.get("text", ""))) if t.strip(): result.append(RenderBlock( block_type="heading_2", css_class="hb-heading-2", data={"text": t}, )) elif btype == "bullets": items = b.get("items", []) if not isinstance(items, list): items = [] normalized = [format_money_figures(str(it).strip()) for it in items if str(it).strip()] html_items = [Markup(emphasize_keywords(it)) for it in normalized] if normalized: result.append(RenderBlock( block_type="bullet_list", css_class="hb-bullet-list", data={"entries": html_items, "html_entries": True}, )) elif btype == "numbered_list": items = b.get("items", []) if not isinstance(items, list): items = [] normalized = [format_money_figures(str(it).strip()) for it in items if str(it).strip()] html_items = [Markup(emphasize_keywords(it)) for it in normalized] if normalized: result.append(RenderBlock( block_type="bullet_list", css_class="hb-bullet-list hb-numbered-list", data={"entries": html_items, "ordered": True, "html_entries": True}, )) elif btype == "note": t = format_money_figures(str(b.get("text", ""))) if t.strip(): result.append(RenderBlock( block_type="note", css_class="hb-note", data={"text": t}, )) elif btype == "note_inline": parts = b.get("parts", []) if not isinstance(parts, list): parts = [] normalized_parts = [] for p in parts: if not isinstance(p, dict): continue t = format_money_figures(str(p.get("text", ""))) if t: normalized_parts.append({ "text": t, "style": str(p.get("style", "")), }) if normalized_parts: result.append(RenderBlock( block_type="note", css_class="hb-note", data={"parts": normalized_parts, "inline": True}, )) elif btype == "table_v1": t_cols = b.get("columns", []) t_rows = b.get("rows", []) if not isinstance(t_cols, list): t_cols = [] if not isinstance(t_rows, list): t_rows = [] norm_rows = [] for r in t_rows: if not isinstance(r, list): continue norm_rows.append([format_money_figures(str(cell)) for cell in r]) result.append(RenderBlock( block_type="table", css_class="hb-table", data={"columns": [str(c) for c in t_cols], "rows": norm_rows, "variant": "standard"}, )) elif btype in ("table_v3", "table_v4"): t_rows = b.get("rows", []) if not isinstance(t_rows, list): t_rows = [] norm_rows = [] for r in t_rows: if not isinstance(r, list): continue norm_row = [] for cell in r: if isinstance(cell, dict): norm_row.append({ "text": format_money_figures(str(cell.get("text", ""))), "colspan": int(cell.get("colspan", 1)) if str(cell.get("colspan", "")).isdigit() else 1, "rowspan": int(cell.get("rowspan", 1)) if str(cell.get("rowspan", "")).isdigit() else 1, }) else: norm_row.append({ "text": format_money_figures(str(cell)), "colspan": 1, "rowspan": 1, }) norm_rows.append(norm_row) result.append(RenderBlock( block_type="table", css_class="hb-table", data={"rows": norm_rows, "variant": "spanning"}, )) return result def _normalize_university_summary( section_title: str, json_data: dict, layout_norm: str, universities: list[dict], ) -> list[RenderBlock]: """Normalise the summary_of_universities section.""" blocks: list[RenderBlock] = [] title = section_title.strip() if title: blocks.append(RenderBlock( block_type="heading_1", css_class="hb-heading-1", data={"text": title}, )) # Intro intro = str(json_data.get("intro", "")).strip() if intro: blocks.append(RenderBlock( block_type="paragraph", css_class="hb-paragraph", data={"text": format_money_figures(intro)}, )) elif layout_norm == "doc_v1" and isinstance(json_data.get("blocks"), list): for b in json_data["blocks"]: if not isinstance(b, dict): continue btype = str(b.get("type", "")) if btype not in ("paragraph", "subheading", "note"): continue t = format_money_figures(str(b.get("text", ""))) if not t.strip(): continue if btype == "subheading": blocks.append(RenderBlock(block_type="heading_2", css_class="hb-heading-2", data={"text": t})) elif btype == "note": blocks.append(RenderBlock(block_type="note", css_class="hb-note", data={"text": t})) else: blocks.append(RenderBlock(block_type="paragraph", css_class="hb-paragraph", data={"text": t})) # Resolve university list resolved: list[str] = [] if universities: def uni_sort_key(u): so = u.get("sort_order") if isinstance(u, dict) else None if so is not None: try: return (0, float(so)) except (ValueError, TypeError): pass return (1, 0.0) sorted_unis = sorted(universities, key=uni_sort_key) for u in sorted_unis: if isinstance(u, dict): name = str(u.get("university_name", u.get("name", ""))).strip() if name: resolved.append(name) if not resolved and layout_norm == "doc_v1" and isinstance(json_data.get("blocks"), list): for b in json_data["blocks"]: if not isinstance(b, dict) or str(b.get("type", "")) != "bullets": continue items = b.get("items", []) if isinstance(items, list): for it in items: it_str = str(it).strip() if it_str: resolved.append(it_str) # Dedupe seen: set[str] = set() deduped: list[str] = [] for nm in resolved: k = nm.lower().strip() if k and k not in seen: seen.add(k) deduped.append(nm) if deduped: blocks.append(RenderBlock( block_type="university_summary", css_class="hb-university-summary", data={"universities": deduped}, )) note = str(json_data.get("note", "")).strip() if note: blocks.append(RenderBlock( block_type="note", css_class="hb-note", data={"text": format_money_figures(note)}, )) return blocks