Spaces:
Running
Running
| """ | |
| Climate Disclosure RAG | |
| ================================== | |
| Gradio Web Interface for sustainability report QA. | |
| Launch: | |
| python app.py | |
| """ | |
| import os | |
| import sys | |
| import json | |
| import time | |
| import re | |
| import html | |
| from urllib.parse import quote | |
| import gradio_client.utils as _gcu | |
| _orig_json_schema_fn = _gcu._json_schema_to_python_type | |
| def _safe_json_schema_to_python_type(schema, defs=None): | |
| if isinstance(schema, bool): | |
| return "Any" | |
| return _orig_json_schema_fn(schema, defs) | |
| _gcu._json_schema_to_python_type = _safe_json_schema_to_python_type | |
| import gradio as gr | |
| SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) | |
| if SCRIPT_DIR not in sys.path: | |
| sys.path.insert(0, SCRIPT_DIR) | |
| from rag_app_backend import ( | |
| run_rag, | |
| run_trustworthy_step1, | |
| run_trustworthy_step2, | |
| run_trustworthy_step3_claims, | |
| API_GEN_MODEL_ALIASES, | |
| list_reports, | |
| HAS_GPU, | |
| OPENAI_EMBED_MODELS, | |
| MOCK_MODE, | |
| REPORTS_DIR, | |
| get_report_chunks, | |
| ) | |
| # ======================== Constants ======================== | |
| PLACEHOLDER_SINGLE = ( | |
| "For 2022 Microsoft Environmental Sustainability Report, " | |
| "do the environmental/sustainability targets set by the company " | |
| "reference external climate change adaptation goals/targets?" | |
| ) | |
| PLACEHOLDER_MULTI = ( | |
| 'For "Does the company encourage downstream partners to carry out climate-related ' | |
| 'risk assessments?", is Boeing 2023 Sustainability Report better than ' | |
| 'AT&T 2022 Sustainability Summary in disclosure quality?' | |
| ) | |
| REPORTS_GITHUB_URL = "https://github.com/tobischimanski/ClimRetrieve/tree/main/Reports" | |
| CPU_EMBED_MODELS = [ | |
| "BM25", | |
| "text-embedding-3-large", | |
| "text-embedding-3-small", | |
| "text-embedding-ada-002", | |
| ] | |
| GPU_EMBED_MODELS = [ | |
| "Qwen3-Embedding-0.6B", | |
| "Qwen3-Embedding-4B", | |
| ] | |
| EMBED_MODELS = (CPU_EMBED_MODELS + GPU_EMBED_MODELS) if HAS_GPU else CPU_EMBED_MODELS | |
| GPU_GEN_MODELS = [ | |
| "Qwen3-4B-Instruct-2507-FP8", | |
| ] | |
| API_GEN_MODELS = list(API_GEN_MODEL_ALIASES.keys()) | |
| API_GEN_MODEL = API_GEN_MODELS[0] if API_GEN_MODELS else "GPT-5-mini (API)" | |
| GEN_MODELS = GPU_GEN_MODELS + API_GEN_MODELS if HAS_GPU else API_GEN_MODELS | |
| OPENAI_EMBED_MODELS_SET = set(OPENAI_EMBED_MODELS) | |
| DEFAULT_OPENAI_API_KEY = ( | |
| os.getenv("OPENAI_API_KEY", "").strip() | |
| or os.getenv("OPENAI_API_KEY_88996", "").strip() | |
| ) | |
| DEFAULT_GEMINI_API_KEY = os.getenv("GEMINI_API_KEY", "").strip() | |
| def _is_gemini_generation_model(gen_model: str) -> bool: | |
| text = str(gen_model or "").strip().upper() | |
| return "GEMINI" in text | |
| # ======================== Helpers ======================== | |
| _pdf_page_count_cache = {} | |
| def _get_pdf_total_pages(report_name: str) -> int: | |
| if not report_name: | |
| return 1 | |
| if report_name in _pdf_page_count_cache: | |
| return _pdf_page_count_cache[report_name] | |
| pdf_path = os.path.join(REPORTS_DIR, report_name) | |
| total = 1 | |
| try: | |
| try: | |
| from pypdf import PdfReader | |
| except Exception: | |
| from PyPDF2 import PdfReader | |
| reader = PdfReader(pdf_path) | |
| total = max(1, len(reader.pages)) | |
| except Exception: | |
| total = 1 | |
| _pdf_page_count_cache[report_name] = total | |
| return total | |
| def _pdf_iframe(report_name: str, page: int = 1) -> str: | |
| pdf_path = os.path.abspath(os.path.join(REPORTS_DIR, report_name)).replace("\\", "/") | |
| pdf_url = f"/file={quote(pdf_path, safe='/:')}#page={max(1, int(page))}&view=FitH" | |
| return ( | |
| f'<iframe src="{pdf_url}" ' | |
| f'width="100%" height="720" ' | |
| f'style="border:1px solid #ddd; border-radius:8px;" ' | |
| f'type="application/pdf">' | |
| f'<p>PDF preview is not supported in this browser. Please download the file.</p>' | |
| f'</iframe>' | |
| ) | |
| def _format_evidence(contexts, highlight_keys=None, highlight_color: str = "#ff7043"): | |
| highlight_set = set(highlight_keys or []) | |
| medals = {0: "\U0001F947", 1: "\U0001F948", 2: "\U0001F949"} | |
| parts = [] | |
| for i, c in enumerate(contexts): | |
| badge = medals.get(i, f"#{i+1}") | |
| report_short = c["report"].replace(".pdf", "") | |
| score = c["score"] | |
| page = c.get("page", None) | |
| key = (str(c.get("report", "")), str(c.get("chunk_idx", ""))) | |
| text_body = str(c.get("text", ""))[:800] | |
| if key in highlight_set: | |
| safe_text = html.escape(text_body) | |
| text_body = ( | |
| f"<div style=\"background:{highlight_color}33;border-left:6px solid {highlight_color};" | |
| f"padding:8px 10px;border-radius:8px;\">{safe_text}</div>" | |
| ) | |
| parts.append( | |
| f"### {badge} {report_short}\n" | |
| f"**Similarity:** {score:.4f} | " | |
| f"**Chunk:** {c['chunk_idx']}" | |
| + (f", **Page:** {page}" if page not in (None, "", "NA") else "") | |
| + "\n\n" | |
| f"{text_body}" | |
| ) | |
| return "\n\n---\n\n".join(parts) if parts else "No evidence retrieved." | |
| def _render_waiting(text: str) -> str: | |
| return f"<div class='waiting-banner'>{text}</div>" | |
| def _preview_chunk_text(text: str, max_sentences: int = 2, max_chars: int = 260) -> str: | |
| raw = str(text or "").replace("\n", " ").replace("\r", " ").strip() | |
| if not raw: | |
| return "" | |
| sentences = [s.strip() for s in re.split(r"(?<=[.!?])\s+", raw) if s.strip()] | |
| if sentences: | |
| preview = " ".join(sentences[:max_sentences]).strip() | |
| else: | |
| preview = raw[:max_chars].strip() | |
| if len(preview) > max_chars: | |
| preview = preview[:max_chars].rstrip() | |
| if len(preview) < len(raw): | |
| return preview + "..." | |
| return preview | |
| def _render_step1_clusters_md(step1: dict) -> str: | |
| contexts = step1.get("contexts", []) if isinstance(step1, dict) else [] | |
| avg_sim = float(step1.get("average_similarity", 0.0) or 0.0) if isinstance(step1, dict) else 0.0 | |
| clusters = step1.get("clusters", []) if isinstance(step1, dict) else [] | |
| lines = [ | |
| "## STEP 1 - RETRIEVAL & EVIDENCE CLUSTERS", | |
| f"- Retrieved **{len(contexts)}** paragraphs", | |
| f"- Clustered into **{len(clusters)}** groups", | |
| f"- Average similarity: **{avg_sim:.2f}**", | |
| "- Note: Average similarity is the mean of retrieved Similarity scores.", | |
| "", | |
| ] | |
| if not clusters: | |
| lines.append("_No clusters returned._") | |
| return "\n".join(lines) | |
| for i, c in enumerate(clusters): | |
| cname = str(c.get("cluster_name", f"Cluster {i + 1}")).strip() | |
| csum = str(c.get("summary", "")).strip() | |
| lines.append(f"### Cluster {chr(65 + (i % 26))} - {cname}") | |
| if csum: | |
| lines.append(f"Summary: {csum}") | |
| evs = c.get("evidence", []) | |
| if isinstance(evs, list) and evs: | |
| for ev in evs: | |
| chunk_idx = ev.get("chunk_idx", "NA") | |
| preview = _preview_chunk_text(str(ev.get("text", "")), max_sentences=2, max_chars=320) | |
| lines.append(f"- Chunk {chunk_idx}: {preview}") | |
| else: | |
| lines.append("- No linked evidence items") | |
| lines.append("") | |
| return "\n".join(lines).strip() | |
| def _render_step2_claims_md(step2: dict) -> str: | |
| if not isinstance(step2, dict): | |
| return "## STEP 2 - ANSWER GENERATION\n_No generation output._" | |
| answer_ready = str(step2.get("answer", "")).strip() | |
| lines = ["## STEP 2 - ANSWER GENERATION", ""] | |
| if not answer_ready: | |
| lines.append("- Warning: no answer returned from Step 2.") | |
| return "\n".join(lines) | |
| lines.append("### Generated Answer") | |
| lines.append(_format_answer(answer_ready)) | |
| return "\n".join(lines) | |
| def _render_step2_summary_md(step2: dict) -> str: | |
| if not isinstance(step2, dict): | |
| return "## STEP 2 - ANSWER GENERATION\n_No generation output._" | |
| answer_ready = str(step2.get("answer", "")).strip() | |
| if not answer_ready: | |
| return "## STEP 2 - ANSWER GENERATION\n- Warning: no answer returned from Step 2." | |
| return "\n".join( | |
| [ | |
| "## STEP 2 - ANSWER GENERATION", | |
| "- Generated answer is shown in the **Generated Answer** panel.", | |
| ] | |
| ) | |
| def _render_step3_md(step3: dict) -> str: | |
| return "\n".join( | |
| [ | |
| "## STEP 3 - CLAIM EXTRACTOR", | |
| "", | |
| "Use the **Claim Trace** buttons below to inspect full claims and highlight linked evidence chunks.", | |
| ] | |
| ) | |
| CLAIM_COLORS = ["#ff7043", "#ffd54f", "#4fc3f7"] | |
| def _prepare_claim_trace(step3: dict): | |
| claim_links = step3.get("claim_links", []) if isinstance(step3, dict) else [] | |
| if not isinstance(claim_links, list): | |
| claim_links = [] | |
| trace = [] | |
| for row in claim_links[:3]: | |
| if not isinstance(row, dict): | |
| continue | |
| claim_text = str(row.get("claim", "")).strip() | |
| evs = row.get("evidence", []) | |
| if not isinstance(evs, list): | |
| evs = [] | |
| keys = [] | |
| chunk_refs = [] | |
| for ev in evs: | |
| if not isinstance(ev, dict): | |
| continue | |
| report = str(ev.get("report", "")) | |
| chunk_idx = str(ev.get("chunk_idx", "")) | |
| keys.append((report, chunk_idx)) | |
| if report or chunk_idx: | |
| report_short = report.replace(".pdf", "").strip() if report else "" | |
| if report_short and chunk_idx: | |
| chunk_refs.append(f"{report_short} chunk {chunk_idx}") | |
| elif chunk_idx: | |
| chunk_refs.append(f"chunk {chunk_idx}") | |
| elif report_short: | |
| chunk_refs.append(report_short) | |
| trace.append( | |
| { | |
| "label": claim_text or "Claim", | |
| "keys": keys, | |
| "chunk_refs": chunk_refs, | |
| "score": None, | |
| } | |
| ) | |
| return trace | |
| def _default_claim_button_updates(): | |
| return [ | |
| gr.update(value="Claim 1", visible=False), | |
| gr.update(value="Claim 2", visible=False), | |
| gr.update(value="Claim 3", visible=False), | |
| ] | |
| def _claim_button_updates(trace): | |
| updates = [] | |
| for i in range(3): | |
| if i < len(trace): | |
| label = str(trace[i].get("label", f"Claim {i+1}")).strip() | |
| score = trace[i].get("score", None) | |
| prefix = ["🟧", "🟨", "🟦"][i] | |
| refs = trace[i].get("chunk_refs", []) if isinstance(trace[i], dict) else [] | |
| ref_text = ", ".join(refs) if refs else "no chunk" | |
| if score is None: | |
| text = f"{prefix} Claim {i+1} [{ref_text}]: {label}" | |
| else: | |
| text = f"{prefix} Claim {i+1} ({score:.2f}) [{ref_text}]: {label}" | |
| updates.append(gr.update(value=text, visible=True)) | |
| else: | |
| updates.append(gr.update(value=f"Claim {i+1}", visible=False)) | |
| return updates | |
| def on_claim_click(claim_idx, contexts_state, claim_trace_state): | |
| contexts = contexts_state if isinstance(contexts_state, list) else [] | |
| trace = claim_trace_state if isinstance(claim_trace_state, list) else [] | |
| idx = int(claim_idx) | |
| if idx < 0 or idx >= len(trace): | |
| return _format_evidence(contexts) | |
| keys = trace[idx].get("keys", []) if isinstance(trace[idx], dict) else [] | |
| color = CLAIM_COLORS[idx % len(CLAIM_COLORS)] | |
| return _format_evidence(contexts, highlight_keys=keys, highlight_color=color) | |
| def clear_claim_highlight(contexts_state): | |
| contexts = contexts_state if isinstance(contexts_state, list) else [] | |
| return _format_evidence(contexts) | |
| def _md_cell(value) -> str: | |
| text = "" if value is None else str(value) | |
| text = text.replace("\n", " ").replace("\r", " ").strip() | |
| return text.replace("|", "\\|") | |
| def _md_table(rows, columns) -> str: | |
| if not rows: | |
| return "" | |
| header = "| " + " | ".join(columns) + " |" | |
| sep = "| " + " | ".join(["---"] * len(columns)) + " |" | |
| body = [] | |
| for row in rows: | |
| body.append("| " + " | ".join(_md_cell(row.get(c, "")) for c in columns) + " |") | |
| return "\n".join([header, sep] + body) | |
| def _truncate(text: str, max_len: int = 180) -> str: | |
| text = _md_cell(text) | |
| return text if len(text) <= max_len else text[: max_len - 3] + "..." | |
| def _shorten_text(text: str, max_len: int = 240) -> str: | |
| text = "" if text is None else str(text) | |
| text = text.replace("\n", " ").replace("\r", " ").strip() | |
| return text if len(text) <= max_len else text[: max_len - 3] + "..." | |
| def _pretty_company_name(raw: str) -> str: | |
| if not raw: | |
| return "" | |
| name = raw.replace("_", " ").strip() | |
| words = [w.upper() if w.lower() in {"esg"} else w.capitalize() for w in name.split()] | |
| return " ".join(words) | |
| def _escape(text) -> str: | |
| return html.escape("" if text is None else str(text)) | |
| def _extract_number_snippets(texts, max_items: int = 3): | |
| snippets = [] | |
| if not isinstance(texts, list): | |
| return snippets | |
| pattern = re.compile( | |
| r"\b\d+(?:\.\d+)?\s?(?:%|percent|billion|million|thousand|mt|tco2e|tons?|years?)\b", | |
| flags=re.IGNORECASE, | |
| ) | |
| for t in texts: | |
| s = str(t or "") | |
| for m in pattern.finditer(s): | |
| start = max(0, m.start() - 40) | |
| end = min(len(s), m.end() + 40) | |
| snippets.append(_shorten_text(s[start:end], 120)) | |
| if len(snippets) >= max_items: | |
| return snippets | |
| return snippets | |
| def _normalize_confidence_distribution(raw): | |
| if not isinstance(raw, dict): | |
| return None | |
| out = {} | |
| for key in ("high", "medium", "low"): | |
| v = raw.get(key, None) | |
| if v is None: | |
| continue | |
| try: | |
| if isinstance(v, str): | |
| s = v.strip() | |
| if s.endswith("%"): | |
| val = float(s[:-1]) / 100.0 | |
| else: | |
| val = float(s) | |
| else: | |
| val = float(v) | |
| if val > 1.0 and val <= 100.0: | |
| val = val / 100.0 | |
| val = max(0.0, min(1.0, val)) | |
| out[key] = val | |
| except Exception: | |
| continue | |
| if not out: | |
| return None | |
| for key in ("high", "medium", "low"): | |
| out.setdefault(key, 0.0) | |
| total = out["high"] + out["medium"] + out["low"] | |
| if total <= 0: | |
| return None | |
| # Normalize to sum=1 for stable UI display. | |
| out["high"] = out["high"] / total | |
| out["medium"] = out["medium"] / total | |
| out["low"] = out["low"] / total | |
| return out | |
| def _extract_confidence_distribution_from_text(text: str): | |
| raw = str(text or "") | |
| cleaned = raw | |
| m = re.search(r"confidence\s*distribution\s*:\s*(\{[^{}]+\})", raw, flags=re.IGNORECASE) | |
| if m: | |
| blob = m.group(1) | |
| parsed = None | |
| for cand in (blob, blob.replace("'", '"')): | |
| try: | |
| parsed = json.loads(cand) | |
| break | |
| except Exception: | |
| continue | |
| conf = _normalize_confidence_distribution(parsed) | |
| if conf: | |
| cleaned = raw.replace(m.group(0), "").strip() | |
| return conf, cleaned | |
| tmp = {} | |
| for k in ("high", "medium", "low"): | |
| mk = re.search(rf"\b{k}\b\s*[:=]\s*([0-9]+(?:\.[0-9]+)?)\s*(%)?", raw, flags=re.IGNORECASE) | |
| if not mk: | |
| continue | |
| num = float(mk.group(1)) | |
| if mk.group(2): | |
| num = num / 100.0 | |
| tmp[k] = num | |
| conf = _normalize_confidence_distribution(tmp if tmp else None) | |
| return conf, cleaned | |
| def _render_confidence_distribution(raw) -> str: | |
| conf = _normalize_confidence_distribution(raw) | |
| if not conf: | |
| return "" | |
| def _row(label: str, key: str, cls: str) -> str: | |
| pct = conf[key] * 100.0 | |
| return ( | |
| "<div class='conf-row'>" | |
| f"<div class='conf-label'>{label}</div>" | |
| "<div class='conf-bar'>" | |
| f"<div class='conf-fill {cls}' style='width:{pct:.1f}%'></div>" | |
| "</div>" | |
| f"<div class='conf-value'>{pct:.1f}%</div>" | |
| "</div>" | |
| ) | |
| return ( | |
| "<div class='confidence-card'>" | |
| "<h4>Confidence Distribution</h4>" | |
| f"{_row('High', 'high', 'conf-high')}" | |
| f"{_row('Medium', 'medium', 'conf-medium')}" | |
| f"{_row('Low', 'low', 'conf-low')}" | |
| "</div>" | |
| ) | |
| def _attach_confidence_block(card_html: str, parsed: dict) -> str: | |
| conf_html = _render_confidence_distribution(parsed.get("confidence_distribution")) | |
| if not conf_html: | |
| return card_html | |
| text = str(card_html or "") | |
| if text.rstrip().endswith("</div>"): | |
| return text.rstrip()[:-6] + conf_html + "</div>" | |
| return text + conf_html | |
| def _normalize_skill_for_render(parsed: dict) -> str: | |
| skill = str(parsed.get("skill", "")).strip().lower() | |
| if "trend" in skill and "quant" in skill: | |
| return "Trend & Quant Comparator" | |
| if "attainment" in skill or ("delta" in skill and "benchmark" in skill): | |
| return "Target Attainment & Delta Benchmark" | |
| if "compliance" in skill and "check" in skill: | |
| return "Compliance Checklist" | |
| if "dimension" in skill and "extract" in skill: | |
| return "Dimension Extractor" | |
| if "contradiction" in skill or "consistency" in skill: | |
| return "Contradiction/Consistency Check" | |
| if "consensus" in skill or "portfolio" in skill or "count" in skill: | |
| return "Consensus/Count (Portfolio Statistics)" | |
| if "comparative" in skill or "table" in skill: | |
| return "Comparative Table Builder" | |
| # Fallback by schema keys when model misses `skill`. | |
| if isinstance(parsed.get("required_checks"), list) and isinstance(parsed.get("reports"), list): | |
| return "Compliance Checklist" | |
| if isinstance(parsed.get("counts"), dict) and isinstance(parsed.get("per_report"), list): | |
| return "Consensus/Count (Portfolio Statistics)" | |
| if isinstance(parsed.get("checks"), list) and isinstance(parsed.get("scores"), dict): | |
| return "Contradiction/Consistency Check" | |
| reports = parsed.get("reports") | |
| if isinstance(reports, list) and reports: | |
| first = reports[0] if isinstance(reports[0], dict) else {} | |
| if isinstance(first.get("benchmarks"), list): | |
| return "Target Attainment & Delta Benchmark" | |
| if isinstance(first.get("quant_metrics"), list) and "strength_score" in first: | |
| return "Trend & Quant Comparator" | |
| if isinstance(first.get("bucket_counts"), dict): | |
| return "Dimension Extractor" | |
| if "maturity_level" in first or "comparison_metrics" in parsed: | |
| return "Comparative Table Builder" | |
| if str(parsed.get("report", "")).strip() and any(k in parsed for k in ("maturity_level", "key_evidence", "quant_metrics", "comparison_metrics", "year")): | |
| return "Comparative Table Builder" | |
| return "" | |
| def _render_quant_metrics_matrix(reports: list, report_key: str = "report", metrics_key: str = "quant_metrics") -> str: | |
| if not isinstance(reports, list) or not reports: | |
| return "" | |
| cols = [] | |
| metric_map = {} | |
| for r in reports: | |
| if not isinstance(r, dict): | |
| continue | |
| name = str(r.get(report_key, "Unknown")).strip() or "Unknown" | |
| cols.append(name) | |
| qms = r.get(metrics_key, []) | |
| if not isinstance(qms, list): | |
| continue | |
| for m in qms: | |
| if not isinstance(m, dict): | |
| continue | |
| metric = str(m.get("metric", "")).strip() | |
| if not metric: | |
| continue | |
| value = m.get("value", None) | |
| unit = m.get("unit", None) | |
| period = m.get("period", None) | |
| if value is None or str(value).strip() == "": | |
| cell = "N/A" | |
| else: | |
| cell = str(value) | |
| if unit not in (None, ""): | |
| cell += f" {unit}" | |
| if period not in (None, ""): | |
| cell += f" ({period})" | |
| metric_map.setdefault(metric, {})[name] = cell | |
| if not metric_map: | |
| return "<div class='maturity-note'>No explicit quantitative metrics found in the output.</div>" | |
| head_cols = "".join(f"<th>{_escape(c)}</th>" for c in cols) | |
| body_rows = [] | |
| for metric in sorted(metric_map.keys()): | |
| row_cells = "".join(f"<td>{_escape(metric_map[metric].get(c, 'N/A'))}</td>" for c in cols) | |
| body_rows.append(f"<tr><th>{_escape(metric)}</th>{row_cells}</tr>") | |
| return ( | |
| "<h4>Quantitative Comparison</h4>" | |
| "<div class='maturity-table-wrap'>" | |
| "<table class='maturity-table'>" | |
| f"<thead><tr><th>Metric</th>{head_cols}</tr></thead>" | |
| f"<tbody>{''.join(body_rows)}</tbody>" | |
| "</table></div>" | |
| ) | |
| def _render_maturity_comparison_html(parsed: dict) -> str: | |
| mc = parsed.get("maturity_comparison") | |
| if not isinstance(mc, dict) or not mc: | |
| return "" | |
| companies = list(mc.keys()) | |
| rows_html = [] | |
| level_cells = [] | |
| evidence_cells = [] | |
| quant_cells = [] | |
| for k in companies: | |
| item = mc.get(k, {}) if isinstance(mc.get(k), dict) else {} | |
| level = str(item.get("maturity_level", "unknown")).strip() | |
| low = level.lower() | |
| suffix = " x" if low == "insufficient" else "" | |
| badge_cls = f"level-{low}" if low in {"high", "moderate", "low", "insufficient"} else "level-unknown" | |
| level_cells.append(f'<td><span class="maturity-badge {badge_cls}">{_escape(level + suffix)}</span></td>') | |
| evidence = item.get("evidence", []) | |
| if isinstance(evidence, list) and evidence: | |
| bullets = "".join(f"<li>{_escape(_shorten_text(e, 180))}</li>" for e in evidence[:2]) | |
| evidence_cells.append(f"<td><ul class='maturity-list'>{bullets}</ul></td>") | |
| else: | |
| evidence_cells.append("<td><span class='muted'>N/A x</span></td>") | |
| rationale = item.get("rationale", "") | |
| snippets = _extract_number_snippets((evidence if isinstance(evidence, list) else []) + [rationale], max_items=3) | |
| if snippets: | |
| quant_cells.append("<td><ul class='maturity-list'>" + "".join(f"<li>{_escape(s)}</li>" for s in snippets) + "</ul></td>") | |
| else: | |
| quant_cells.append("<td><span class='muted'>No numeric metric found</span></td>") | |
| rows_html.append("<tr><th>Maturity Level</th>" + "".join(level_cells) + "</tr>") | |
| rows_html.append("<tr><th>Key Evidence</th>" + "".join(evidence_cells) + "</tr>") | |
| rows_html.append("<tr><th>Quant Metrics</th>" + "".join(quant_cells) + "</tr>") | |
| header_cells = "".join(f"<th>{_escape(_pretty_company_name(c))}</th>" for c in companies) | |
| conclusion = parsed.get("conclusion", "") | |
| conclusion_html = "" | |
| if isinstance(conclusion, str) and conclusion.strip(): | |
| conclusion_html = ( | |
| "<div class='maturity-conclusion'>" | |
| "<h4>Conclusion</h4>" | |
| f"<p>{_escape(conclusion.strip())}</p>" | |
| "</div>" | |
| ) | |
| return ( | |
| "<div class='maturity-card'>" | |
| "<h3>Company Maturity Comparison</h3>" | |
| "<div class='maturity-table-wrap'>" | |
| "<table class='maturity-table'>" | |
| "<thead><tr><th>Attribute</th>" | |
| f"{header_cells}" | |
| "</tr></thead>" | |
| "<tbody>" | |
| f"{''.join(rows_html)}" | |
| "</tbody></table></div>" | |
| f"{conclusion_html}" | |
| "</div>" | |
| ) | |
| def _render_comparative_table_builder_html(parsed: dict) -> str: | |
| reports = parsed.get("reports", []) | |
| if not isinstance(reports, list) or not reports: | |
| return "" | |
| header_cells = "".join(f"<th>{_escape(r.get('report', 'Unknown'))}</th>" for r in reports if isinstance(r, dict)) | |
| if not header_cells: | |
| return "" | |
| maturity_cells = [] | |
| evidence_cells = [] | |
| for r in reports: | |
| if not isinstance(r, dict): | |
| continue | |
| level = str(r.get("maturity_level", "unknown")).strip() | |
| low = level.lower() | |
| suffix = " x" if low == "insufficient" else "" | |
| cls = f"level-{low}" if low in {"high", "moderate", "low", "insufficient"} else "level-unknown" | |
| maturity_cells.append(f'<td><span class="maturity-badge {cls}">{_escape(level + suffix)}</span></td>') | |
| evidence = r.get("key_evidence", []) | |
| if isinstance(evidence, list) and evidence: | |
| bullets = "".join(f"<li>{_escape(_shorten_text(e, 180))}</li>" for e in evidence[:2]) | |
| evidence_cells.append(f"<td><ul class='maturity-list'>{bullets}</ul></td>") | |
| else: | |
| evidence_cells.append("<td><span class='muted'>N/A</span></td>") | |
| compare_metrics = parsed.get("comparison_metrics", []) | |
| compare_html = "" | |
| if isinstance(compare_metrics, list) and compare_metrics: | |
| chips = "".join(f"<span class='metric-chip'>{_escape(m)}</span>" for m in compare_metrics[:8]) | |
| compare_html = f"<div class='metric-chip-wrap'><strong>Compared metrics:</strong> {chips}</div>" | |
| matrix_html = _render_quant_metrics_matrix(reports, report_key="report", metrics_key="quant_metrics") | |
| conclusion = str(parsed.get("conclusion", "")).strip() | |
| conclusion_html = f"<div class='maturity-conclusion'><h4>Conclusion</h4><p>{_escape(conclusion)}</p></div>" if conclusion else "" | |
| return ( | |
| "<div class='maturity-card'>" | |
| "<h3>Comparative Table Builder</h3>" | |
| "<div class='maturity-table-wrap'><table class='maturity-table'>" | |
| f"<thead><tr><th>Attribute</th>{header_cells}</tr></thead>" | |
| "<tbody>" | |
| f"<tr><th>Maturity Level</th>{''.join(maturity_cells)}</tr>" | |
| f"<tr><th>Key Evidence</th>{''.join(evidence_cells)}</tr>" | |
| "</tbody></table></div>" | |
| f"{compare_html}" | |
| f"{matrix_html}" | |
| f"{conclusion_html}" | |
| "</div>" | |
| ) | |
| def _format_num(v) -> str: | |
| if v in (None, ""): | |
| return "N/A" | |
| try: | |
| f = float(v) | |
| if f.is_integer(): | |
| return str(int(f)) | |
| return f"{f:.2f}" | |
| except Exception: | |
| return str(v) | |
| def _format_pct(v) -> str: | |
| if v in (None, ""): | |
| return "N/A" | |
| try: | |
| return f"{float(v):.2f}%" | |
| except Exception: | |
| s = str(v) | |
| return s if s.endswith("%") else f"{s}%" | |
| def _trend_badge(direction: str) -> str: | |
| d = str(direction or "unknown").strip().lower() | |
| icon = {"up": "up", "down": "down", "flat": "flat"}.get(d, "unknown") | |
| return f"<span class='metric-chip trend-{icon}'>{_escape(d)}</span>" | |
| def _render_trend_metric_cell(metric_obj: dict) -> str: | |
| if not isinstance(metric_obj, dict): | |
| return "<span class='muted'>N/A</span>" | |
| value = _format_num(metric_obj.get("value")) | |
| unit = metric_obj.get("unit") | |
| period = metric_obj.get("period") | |
| value_line = value | |
| if unit not in (None, "") and value != "N/A": | |
| value_line = f"{value} {unit}" | |
| if period not in (None, "") and value_line != "N/A": | |
| value_line = f"{value_line} ({period})" | |
| intensity = _format_num(metric_obj.get("intensity")) | |
| attainment = _format_pct(metric_obj.get("attainment_rate")) | |
| change = _format_num(metric_obj.get("change_magnitude")) | |
| note = _shorten_text(metric_obj.get("note", ""), 90) | |
| note_html = "" if not note else f"<div class='muted'>{_escape(note)}</div>" | |
| return ( | |
| f"<div><strong>Value:</strong> {_escape(value_line)}</div>" | |
| f"<div><strong>Intensity:</strong> {_escape(intensity)}</div>" | |
| f"<div><strong>Attainment:</strong> {_escape(attainment)}</div>" | |
| f"<div><strong>Change:</strong> {_escape(change)}</div>" | |
| f"<div><strong>Trend:</strong> {_trend_badge(metric_obj.get('trend_direction', 'unknown'))}</div>" | |
| f"{note_html}" | |
| ) | |
| def _render_trend_quant_comparator_html(parsed: dict) -> str: | |
| reports = parsed.get("reports", []) | |
| if not isinstance(reports, list) or not reports: | |
| return "" | |
| report_names = [str(r.get("report", "Unknown")) for r in reports if isinstance(r, dict)] | |
| if not report_names: | |
| return "" | |
| header = "".join(f"<th>{_escape(n)}</th>" for n in report_names) | |
| strength_cells = [] | |
| evidence_cells = [] | |
| metric_map = {} | |
| for r in reports: | |
| if not isinstance(r, dict): | |
| continue | |
| rn = str(r.get("report", "Unknown")) | |
| strength_cells.append(f"<td>{_escape(_format_num(r.get('strength_score')))}</td>") | |
| ev = r.get("key_evidence", []) | |
| if isinstance(ev, list) and ev: | |
| bullets = "".join(f"<li>{_escape(_shorten_text(x, 130))}</li>" for x in ev[:3]) | |
| evidence_cells.append(f"<td><ul class='maturity-list'>{bullets}</ul></td>") | |
| else: | |
| evidence_cells.append("<td><span class='muted'>N/A</span></td>") | |
| for m in r.get("quant_metrics", []) if isinstance(r.get("quant_metrics"), list) else []: | |
| if not isinstance(m, dict): | |
| continue | |
| metric = str(m.get("metric", "")).strip() | |
| if not metric: | |
| continue | |
| metric_map.setdefault(metric, {})[rn] = m | |
| metric_rows = [] | |
| for metric in sorted(metric_map.keys()): | |
| cells = [] | |
| for rn in report_names: | |
| cells.append(f"<td>{_render_trend_metric_cell(metric_map[metric].get(rn, {}))}</td>") | |
| metric_rows.append(f"<tr><th>{_escape(metric)}</th>{''.join(cells)}</tr>") | |
| if not metric_rows: | |
| metric_rows = [f"<tr><th>Quant Metrics</th><td colspan='{len(report_names)}'><span class='muted'>No quantitative metrics returned.</span></td></tr>"] | |
| highlights = parsed.get("metric_highlights", []) | |
| highlight_html = "" | |
| if isinstance(highlights, list) and highlights: | |
| chips = "".join(f"<span class='metric-chip'>{_escape(_shorten_text(x, 60))}</span>" for x in highlights[:10]) | |
| highlight_html = f"<div class='metric-chip-wrap'><strong>Metric Highlights:</strong> {chips}</div>" | |
| conclusion = str(parsed.get("conclusion", "")).strip() | |
| conclusion_html = f"<div class='maturity-conclusion'><h4>Conclusion</h4><p>{_escape(conclusion)}</p></div>" if conclusion else "" | |
| return ( | |
| "<div class='maturity-card'>" | |
| "<h3>Trend & Quant Comparator</h3>" | |
| "<div class='maturity-table-wrap'><table class='maturity-table'>" | |
| f"<thead><tr><th>Attribute</th>{header}</tr></thead>" | |
| "<tbody>" | |
| f"<tr><th>Strength Score</th>{''.join(strength_cells)}</tr>" | |
| f"<tr><th>Key Evidence</th>{''.join(evidence_cells)}</tr>" | |
| f"{''.join(metric_rows)}" | |
| "</tbody></table></div>" | |
| f"{highlight_html}" | |
| f"{conclusion_html}" | |
| "</div>" | |
| ) | |
| def _render_benchmark_metric_cell(metric_obj: dict) -> str: | |
| if not isinstance(metric_obj, dict): | |
| return "<span class='muted'>N/A</span>" | |
| unit = metric_obj.get("unit") | |
| base_v = _format_num(metric_obj.get("baseline_value")) | |
| cur_v = _format_num(metric_obj.get("current_value")) | |
| tgt_v = _format_num(metric_obj.get("target_value")) | |
| base_p = metric_obj.get("baseline_period") | |
| cur_p = metric_obj.get("current_period") | |
| tgt_p = metric_obj.get("target_period") | |
| def _with_unit(v): | |
| if v == "N/A": | |
| return v | |
| return f"{v} {unit}" if unit not in (None, "") else v | |
| baseline = _with_unit(base_v) | |
| current = _with_unit(cur_v) | |
| target = _with_unit(tgt_v) | |
| baseline = f"{baseline} ({base_p})" if base_p not in (None, "") and baseline != "N/A" else baseline | |
| current = f"{current} ({cur_p})" if cur_p not in (None, "") and current != "N/A" else current | |
| target = f"{target} ({tgt_p})" if tgt_p not in (None, "") and target != "N/A" else target | |
| attainment = _format_pct(metric_obj.get("attainment_rate")) | |
| delta_abs = _format_num(metric_obj.get("delta_abs")) | |
| delta_pct = _format_pct(metric_obj.get("delta_percent")) | |
| intensity = _format_num(metric_obj.get("intensity")) | |
| note = _shorten_text(metric_obj.get("note", ""), 90) | |
| note_html = "" if not note else f"<div class='muted'>{_escape(note)}</div>" | |
| return ( | |
| f"<div><strong>Baseline:</strong> {_escape(baseline)}</div>" | |
| f"<div><strong>Current:</strong> {_escape(current)}</div>" | |
| f"<div><strong>Target:</strong> {_escape(target)}</div>" | |
| f"<div><strong>Attainment:</strong> {_escape(attainment)}</div>" | |
| f"<div><strong>Delta (abs/%):</strong> {_escape(delta_abs)} / {_escape(delta_pct)}</div>" | |
| f"<div><strong>Intensity:</strong> {_escape(intensity)}</div>" | |
| f"<div><strong>Trend:</strong> {_trend_badge(metric_obj.get('trend_direction', 'unknown'))}</div>" | |
| f"{note_html}" | |
| ) | |
| def _render_target_attainment_delta_html(parsed: dict) -> str: | |
| reports = parsed.get("reports", []) | |
| if not isinstance(reports, list) or not reports: | |
| return "" | |
| report_names = [str(r.get("report", "Unknown")) for r in reports if isinstance(r, dict)] | |
| if not report_names: | |
| return "" | |
| header = "".join(f"<th>{_escape(n)}</th>" for n in report_names) | |
| strength_cells = [] | |
| evidence_cells = [] | |
| metric_map = {} | |
| for r in reports: | |
| if not isinstance(r, dict): | |
| continue | |
| rn = str(r.get("report", "Unknown")) | |
| strength = str(r.get("overall_strength", "insufficient")).strip() | |
| low = strength.lower() | |
| suffix = " x" if low == "insufficient" else "" | |
| cls = f"level-{low}" if low in {"high", "moderate", "low", "insufficient"} else "level-unknown" | |
| strength_cells.append(f'<td><span class="maturity-badge {cls}">{_escape(strength + suffix)}</span></td>') | |
| ev = r.get("key_evidence", []) | |
| if isinstance(ev, list) and ev: | |
| bullets = "".join(f"<li>{_escape(_shorten_text(x, 130))}</li>" for x in ev[:3]) | |
| evidence_cells.append(f"<td><ul class='maturity-list'>{bullets}</ul></td>") | |
| else: | |
| evidence_cells.append("<td><span class='muted'>N/A</span></td>") | |
| for m in r.get("benchmarks", []) if isinstance(r.get("benchmarks"), list) else []: | |
| if not isinstance(m, dict): | |
| continue | |
| metric = str(m.get("metric", "")).strip() | |
| if not metric: | |
| continue | |
| metric_map.setdefault(metric, {})[rn] = m | |
| metric_rows = [] | |
| for metric in sorted(metric_map.keys()): | |
| cells = [] | |
| for rn in report_names: | |
| cells.append(f"<td>{_render_benchmark_metric_cell(metric_map[metric].get(rn, {}))}</td>") | |
| metric_rows.append(f"<tr><th>{_escape(metric)}</th>{''.join(cells)}</tr>") | |
| if not metric_rows: | |
| metric_rows = [f"<tr><th>Benchmarks</th><td colspan='{len(report_names)}'><span class='muted'>No benchmark metrics returned.</span></td></tr>"] | |
| leaderboard = parsed.get("leaderboard", []) | |
| leaderboard_html = "" | |
| if isinstance(leaderboard, list) and leaderboard: | |
| rows = [] | |
| for item in leaderboard[:6]: | |
| if not isinstance(item, dict): | |
| continue | |
| rows.append( | |
| "<tr>" | |
| f"<td>{_escape(item.get('report', ''))}</td>" | |
| f"<td>{_escape(_format_num(item.get('score')))}</td>" | |
| f"<td>{_escape(_shorten_text(item.get('reason', ''), 120))}</td>" | |
| "</tr>" | |
| ) | |
| if rows: | |
| leaderboard_html = ( | |
| "<div class='maturity-table-wrap' style='margin-top:10px;'>" | |
| "<table class='maturity-table'>" | |
| "<thead><tr><th>Leaderboard Report</th><th>Score</th><th>Reason</th></tr></thead>" | |
| f"<tbody>{''.join(rows)}</tbody>" | |
| "</table></div>" | |
| ) | |
| conclusion = str(parsed.get("conclusion", "")).strip() | |
| conclusion_html = f"<div class='maturity-conclusion'><h4>Conclusion</h4><p>{_escape(conclusion)}</p></div>" if conclusion else "" | |
| return ( | |
| "<div class='maturity-card'>" | |
| "<h3>Target Attainment & Delta Benchmark</h3>" | |
| "<div class='maturity-table-wrap'><table class='maturity-table'>" | |
| f"<thead><tr><th>Attribute</th>{header}</tr></thead>" | |
| "<tbody>" | |
| f"<tr><th>Overall Strength</th>{''.join(strength_cells)}</tr>" | |
| f"<tr><th>Key Evidence</th>{''.join(evidence_cells)}</tr>" | |
| f"{''.join(metric_rows)}" | |
| "</tbody></table></div>" | |
| f"{leaderboard_html}" | |
| f"{conclusion_html}" | |
| "</div>" | |
| ) | |
| def _render_compliance_checklist_html(parsed: dict) -> str: | |
| reports = parsed.get("reports", []) | |
| if not isinstance(reports, list) or not reports: | |
| return "" | |
| required_items = parsed.get("required_checks", []) | |
| if not isinstance(required_items, list): | |
| required_items = [] | |
| report_names = [str(r.get("report", "Unknown")) for r in reports if isinstance(r, dict)] | |
| if not report_names: | |
| return "" | |
| header = "".join(f"<th>{_escape(n)}</th>" for n in report_names) | |
| summary_cells = [] | |
| evidence_cells = [] | |
| for r in reports: | |
| if not isinstance(r, dict): | |
| continue | |
| s = r.get("summary", {}) if isinstance(r.get("summary"), dict) else {} | |
| summary_cells.append( | |
| "<td>" | |
| f"pass={_escape(s.get('pass', 0))}, partial={_escape(s.get('partial', 0))}, fail={_escape(s.get('fail', 0))}" | |
| f"<br>completion={_escape(s.get('completion_rate', 'N/A'))}" | |
| "</td>" | |
| ) | |
| ev = r.get("key_evidence", []) | |
| if isinstance(ev, list) and ev: | |
| bullets = "".join(f"<li>{_escape(_shorten_text(x, 130))}</li>" for x in ev[:3]) | |
| evidence_cells.append(f"<td><ul class='maturity-list'>{bullets}</ul></td>") | |
| else: | |
| evidence_cells.append("<td><span class='muted'>N/A</span></td>") | |
| item_rows = [] | |
| for item in required_items: | |
| cells = [] | |
| for r in reports: | |
| status = "insufficient" | |
| quant = "N/A" | |
| note = "" | |
| checks = r.get("checks", []) if isinstance(r, dict) else [] | |
| if isinstance(checks, list): | |
| for c in checks: | |
| if not isinstance(c, dict): | |
| continue | |
| if str(c.get("item", "")).strip().lower() == str(item).strip().lower(): | |
| status = str(c.get("status", "insufficient")) | |
| qv = c.get("quant_value", None) | |
| qu = c.get("quant_unit", None) | |
| quant = "N/A" if qv in (None, "") else f"{qv}{'' if qu in (None, '') else ' ' + str(qu)}" | |
| note = _shorten_text(c.get("note", ""), 90) | |
| break | |
| mark = " x" if status.lower() in {"fail", "insufficient"} else "" | |
| cells.append(f"<td><strong>{_escape(status + mark)}</strong><br>{_escape(quant)}<br><span class='muted'>{_escape(note)}</span></td>") | |
| item_rows.append(f"<tr><th>{_escape(item)}</th>{''.join(cells)}</tr>") | |
| matrix_html = _render_quant_metrics_matrix(reports, report_key="report", metrics_key="quant_metrics") | |
| conclusion = str(parsed.get("conclusion", "")).strip() | |
| conclusion_html = f"<div class='maturity-conclusion'><h4>Conclusion</h4><p>{_escape(conclusion)}</p></div>" if conclusion else "" | |
| return ( | |
| "<div class='maturity-card'>" | |
| "<h3>Compliance Checklist</h3>" | |
| "<div class='maturity-table-wrap'><table class='maturity-table'>" | |
| f"<thead><tr><th>Checklist Item</th>{header}</tr></thead>" | |
| "<tbody>" | |
| f"<tr><th>Summary</th>{''.join(summary_cells)}</tr>" | |
| f"<tr><th>Key Evidence</th>{''.join(evidence_cells)}</tr>" | |
| f"{''.join(item_rows)}" | |
| "</tbody></table></div>" | |
| f"{matrix_html}" | |
| f"{conclusion_html}" | |
| "</div>" | |
| ) | |
| def _render_dimension_extractor_html(parsed: dict) -> str: | |
| reports = parsed.get("reports", []) | |
| if not isinstance(reports, list) or not reports: | |
| return "" | |
| report_names = [str(r.get("report", "Unknown")) for r in reports if isinstance(r, dict)] | |
| if not report_names: | |
| return "" | |
| header = "".join(f"<th>{_escape(n)}</th>" for n in report_names) | |
| rows = [] | |
| for b in ["Process", "Input", "Output", "Outcome", "Governance", "Risk"]: | |
| cells = [] | |
| for r in reports: | |
| bc = r.get("bucket_counts", {}) if isinstance(r, dict) and isinstance(r.get("bucket_counts"), dict) else {} | |
| cells.append(f"<td>{_escape(bc.get(b, 0))}</td>") | |
| rows.append(f"<tr><th>{_escape(b)}</th>{''.join(cells)}</tr>") | |
| coverage_cells = [] | |
| evidence_cells = [] | |
| for r in reports: | |
| level = str(r.get("coverage_level", "unknown")) if isinstance(r, dict) else "unknown" | |
| low = level.lower() | |
| suffix = " x" if low == "insufficient" else "" | |
| cls = f"level-{low}" if low in {"high", "moderate", "low", "insufficient"} else "level-unknown" | |
| coverage_cells.append(f'<td><span class="maturity-badge {cls}">{_escape(level + suffix)}</span></td>') | |
| ev = r.get("key_evidence", []) if isinstance(r, dict) else [] | |
| if isinstance(ev, list) and ev: | |
| bullets = "".join(f"<li>{_escape(_shorten_text(x, 130))}</li>" for x in ev[:3]) | |
| evidence_cells.append(f"<td><ul class='maturity-list'>{bullets}</ul></td>") | |
| else: | |
| evidence_cells.append("<td><span class='muted'>N/A</span></td>") | |
| matrix_html = _render_quant_metrics_matrix(reports, report_key="report", metrics_key="quant_metrics") | |
| conclusion = str(parsed.get("conclusion", "")).strip() | |
| conclusion_html = f"<div class='maturity-conclusion'><h4>Conclusion</h4><p>{_escape(conclusion)}</p></div>" if conclusion else "" | |
| return ( | |
| "<div class='maturity-card'>" | |
| "<h3>Dimension Extractor</h3>" | |
| "<div class='maturity-table-wrap'><table class='maturity-table'>" | |
| f"<thead><tr><th>Bucket</th>{header}</tr></thead>" | |
| "<tbody>" | |
| f"{''.join(rows)}" | |
| f"<tr><th>Coverage Level</th>{''.join(coverage_cells)}</tr>" | |
| f"<tr><th>Key Evidence</th>{''.join(evidence_cells)}</tr>" | |
| "</tbody></table></div>" | |
| f"{matrix_html}" | |
| f"{conclusion_html}" | |
| "</div>" | |
| ) | |
| def _render_consistency_check_html(parsed: dict) -> str: | |
| checks = parsed.get("checks", []) | |
| scores = parsed.get("scores", {}) if isinstance(parsed.get("scores"), dict) else {} | |
| if not isinstance(checks, list): | |
| checks = [] | |
| check_rows = [] | |
| for c in checks: | |
| if not isinstance(c, dict): | |
| continue | |
| result = str(c.get("result", "insufficient")).strip() | |
| mark = " x" if result.lower() in {"inconsistent", "insufficient"} else "" | |
| check_rows.append( | |
| "<tr>" | |
| f"<td>{_escape(c.get('rule', ''))}</td>" | |
| f"<td>{_escape(result + mark)}</td>" | |
| f"<td>{_escape(_shorten_text(c.get('note', ''), 180))}</td>" | |
| "</tr>" | |
| ) | |
| check_rows_html = "".join(check_rows) if check_rows else "<tr><td colspan='3'>No checks returned.</td></tr>" | |
| key_evidence = parsed.get("key_evidence", []) | |
| key_evidence_html = "" | |
| if isinstance(key_evidence, list) and key_evidence: | |
| bullets = "".join(f"<li>{_escape(_shorten_text(x, 180))}</li>" for x in key_evidence[:6]) | |
| key_evidence_html = ( | |
| "<div class='maturity-table-wrap' style='margin-top:10px;'><table class='maturity-table'>" | |
| "<thead><tr><th>Key Evidence</th></tr></thead>" | |
| f"<tbody><tr><td><ul class='maturity-list'>{bullets}</ul></td></tr></tbody>" | |
| "</table></div>" | |
| ) | |
| conclusion = str(parsed.get("conclusion", "")).strip() | |
| conclusion_html = f"<div class='maturity-conclusion'><h4>Conclusion</h4><p>{_escape(conclusion)}</p></div>" if conclusion else "" | |
| return ( | |
| "<div class='maturity-card'>" | |
| "<h3>Contradiction / Consistency Check</h3>" | |
| "<div class='maturity-table-wrap'><table class='maturity-table'>" | |
| "<thead><tr><th>Rule</th><th>Result</th><th>Note</th></tr></thead>" | |
| f"<tbody>{check_rows_html}</tbody>" | |
| "</table></div>" | |
| f"{key_evidence_html}" | |
| "<div class='maturity-table-wrap' style='margin-top:10px;'><table class='maturity-table'>" | |
| "<thead><tr><th>Score Item</th><th>Value</th></tr></thead>" | |
| "<tbody>" | |
| f"<tr><th>consistent</th><td>{_escape(scores.get('consistent', 0))}</td></tr>" | |
| f"<tr><th>inconsistent</th><td>{_escape(scores.get('inconsistent', 0))}</td></tr>" | |
| f"<tr><th>insufficient</th><td>{_escape(scores.get('insufficient', 0))}</td></tr>" | |
| f"<tr><th>consistency_rate</th><td>{_escape(scores.get('consistency_rate', 'N/A'))}</td></tr>" | |
| "</tbody></table></div>" | |
| f"{conclusion_html}" | |
| "</div>" | |
| ) | |
| def _render_consensus_count_html(parsed: dict) -> str: | |
| counts = parsed.get("counts", {}) if isinstance(parsed.get("counts"), dict) else {} | |
| percentages = parsed.get("percentages", {}) if isinstance(parsed.get("percentages"), dict) else {} | |
| per_report = parsed.get("per_report", []) | |
| if not isinstance(per_report, list): | |
| per_report = [] | |
| ev_map = {} | |
| key_evidence_by_report = parsed.get("key_evidence_by_report", []) | |
| if isinstance(key_evidence_by_report, list): | |
| for row in key_evidence_by_report: | |
| if not isinstance(row, dict): | |
| continue | |
| report = str(row.get("report", "")).strip() | |
| ev = row.get("key_evidence", []) | |
| if report and isinstance(ev, list): | |
| ev_map[report] = ev | |
| report_rows = [] | |
| for r in per_report: | |
| if not isinstance(r, dict): | |
| continue | |
| label = str(r.get("label", "insufficient")) | |
| mark = " x" if label.lower() in {"missing", "insufficient"} else "" | |
| report = str(r.get("report", "")) | |
| ev = ev_map.get(report, r.get("key_evidence", [])) | |
| ev_html = "<span class='muted'>N/A</span>" | |
| if isinstance(ev, list) and ev: | |
| ev_html = "<ul class='maturity-list'>" + "".join(f"<li>{_escape(_shorten_text(x, 120))}</li>" for x in ev[:2]) + "</ul>" | |
| report_rows.append("<tr><td>{}</td><td>{}</td><td>{}</td></tr>".format(_escape(report), _escape(label + mark), ev_html)) | |
| report_rows_html = "".join(report_rows) if report_rows else "<tr><td colspan='3'>No report labels returned.</td></tr>" | |
| matrix_html = _render_quant_metrics_matrix(per_report, report_key="report", metrics_key="quant_metrics") | |
| consensus_items = parsed.get("consensus_items", []) | |
| outliers = parsed.get("outliers", []) | |
| consensus_html = "".join(f"<li>{_escape(_shorten_text(x, 140))}</li>" for x in consensus_items[:6]) if isinstance(consensus_items, list) else "" | |
| outliers_html = "".join(f"<li>{_escape(_shorten_text(x, 140))}</li>" for x in outliers[:6]) if isinstance(outliers, list) else "" | |
| conclusion = str(parsed.get("conclusion", "")).strip() | |
| conclusion_html = f"<div class='maturity-conclusion'><h4>Conclusion</h4><p>{_escape(conclusion)}</p></div>" if conclusion else "" | |
| return ( | |
| "<div class='maturity-card'>" | |
| "<h3>Consensus / Count (Portfolio Statistics)</h3>" | |
| "<div class='maturity-table-wrap'><table class='maturity-table'>" | |
| "<thead><tr><th>Count Item</th><th>Value</th></tr></thead>" | |
| "<tbody>" | |
| f"<tr><th>explicit</th><td>{_escape(counts.get('explicit', 0))} ({_escape(percentages.get('explicit', 'N/A'))}%)</td></tr>" | |
| f"<tr><th>partial</th><td>{_escape(counts.get('partial', 0))} ({_escape(percentages.get('partial', 'N/A'))}%)</td></tr>" | |
| f"<tr><th>missing</th><td>{_escape(counts.get('missing', 0))} ({_escape(percentages.get('missing', 'N/A'))}%)</td></tr>" | |
| f"<tr><th>total</th><td>{_escape(counts.get('total', len(per_report)))}</td></tr>" | |
| "</tbody></table></div>" | |
| "<div class='maturity-table-wrap' style='margin-top:10px;'><table class='maturity-table'>" | |
| "<thead><tr><th>Report</th><th>Label</th><th>Key Evidence</th></tr></thead>" | |
| f"<tbody>{report_rows_html}</tbody>" | |
| "</table></div>" | |
| f"{matrix_html}" | |
| "<div class='maturity-split'>" | |
| f"<div><h4>Consensus Items</h4><ul class='maturity-list'>{consensus_html or '<li>None</li>'}</ul></div>" | |
| f"<div><h4>Outliers</h4><ul class='maturity-list'>{outliers_html or '<li>None</li>'}</ul></div>" | |
| "</div>" | |
| f"{conclusion_html}" | |
| "</div>" | |
| ) | |
| def _coerce_payload_for_ui(payload): | |
| if isinstance(payload, list): | |
| if payload and all(isinstance(x, dict) and str(x.get("report", "")).strip() for x in payload): | |
| return { | |
| "skill": "Comparative Table Builder", | |
| "reports": payload, | |
| } | |
| return payload | |
| if not isinstance(payload, dict): | |
| return payload | |
| if isinstance(payload.get("reports"), list) or isinstance(payload.get("maturity_comparison"), dict): | |
| return payload | |
| is_single_report_record = ( | |
| str(payload.get("report", "")).strip() != "" | |
| and any(k in payload for k in ("maturity_level", "key_evidence", "quant_metrics", "comparison_metrics", "year")) | |
| and not any(k in payload for k in ("answer", "explanation", "evidence_ids", "rows", "ranking")) | |
| ) | |
| if not is_single_report_record: | |
| return payload | |
| report_item = { | |
| "report": payload.get("report", "Unknown"), | |
| "year": payload.get("year", None), | |
| "maturity_level": payload.get("maturity_level", "unknown"), | |
| "key_evidence": payload.get("key_evidence", []) if isinstance(payload.get("key_evidence"), list) else [], | |
| "quant_metrics": payload.get("quant_metrics", []) if isinstance(payload.get("quant_metrics"), list) else [], | |
| } | |
| normalized = { | |
| "skill": str(payload.get("skill", "")).strip() or "Comparative Table Builder", | |
| "reports": [report_item], | |
| } | |
| if "dimension" in payload: | |
| normalized["dimension"] = payload.get("dimension") | |
| if isinstance(payload.get("comparison_metrics"), list): | |
| normalized["comparison_metrics"] = payload.get("comparison_metrics") | |
| if "conclusion" in payload: | |
| normalized["conclusion"] = payload.get("conclusion") | |
| if "confidence_distribution" in payload: | |
| normalized["confidence_distribution"] = payload.get("confidence_distribution") | |
| return normalized | |
| def _render_skill_specific_html(parsed: dict) -> str: | |
| legacy = _render_maturity_comparison_html(parsed) | |
| if legacy: | |
| return _attach_confidence_block(legacy, parsed) | |
| skill = _normalize_skill_for_render(parsed) | |
| html = "" | |
| if skill == "Trend & Quant Comparator": | |
| html = _render_trend_quant_comparator_html(parsed) | |
| if skill == "Target Attainment & Delta Benchmark": | |
| html = _render_target_attainment_delta_html(parsed) | |
| if skill == "Comparative Table Builder": | |
| html = _render_comparative_table_builder_html(parsed) | |
| if skill == "Compliance Checklist": | |
| html = _render_compliance_checklist_html(parsed) | |
| if skill == "Dimension Extractor": | |
| html = _render_dimension_extractor_html(parsed) | |
| if skill == "Contradiction/Consistency Check": | |
| html = _render_consistency_check_html(parsed) | |
| if skill == "Consensus/Count (Portfolio Statistics)": | |
| html = _render_consensus_count_html(parsed) | |
| if not html: | |
| return "" | |
| return _attach_confidence_block(html, parsed) | |
| def _extract_json_payload(text: str): | |
| """Extract a JSON value from mixed model output text.""" | |
| if not text: | |
| return None | |
| decoder = json.JSONDecoder() | |
| # 1) Whole string is JSON. | |
| try: | |
| return json.loads(text) | |
| except Exception: | |
| pass | |
| # 2) JSON fenced blocks. | |
| for block in re.findall(r"```(?:json)?\s*([\s\S]*?)```", text, flags=re.IGNORECASE): | |
| try: | |
| return json.loads(block.strip()) | |
| except Exception: | |
| continue | |
| # 3) Marker-based extraction. | |
| marker = "Final Answer in JSON:" | |
| if marker in text: | |
| tail = text.split(marker, 1)[1].strip() | |
| if tail: | |
| try: | |
| obj, _ = decoder.raw_decode(tail) | |
| return obj | |
| except Exception: | |
| pass | |
| # 4) Scan every '{' and try raw_decode on that suffix. | |
| candidates = [] | |
| for i, ch in enumerate(text): | |
| if ch != "{": | |
| continue | |
| try: | |
| obj, end = decoder.raw_decode(text[i:]) | |
| consumed = text[i:i + end] | |
| candidates.append((obj, len(consumed))) | |
| except Exception: | |
| continue | |
| if not candidates: | |
| return None | |
| # Prefer dicts with known answer schemas, else the largest parsed candidate. | |
| def _score(item): | |
| obj, consumed_len = item | |
| schema_bonus = 0 | |
| if isinstance(obj, dict): | |
| if any(k in obj for k in ("dimension", "rows", "ranking")): | |
| schema_bonus += 10 | |
| if any(k in obj for k in ("answer", "explanation", "evidence_ids")): | |
| schema_bonus += 10 | |
| if "maturity_comparison" in obj: | |
| schema_bonus += 15 | |
| if any(k in obj for k in ("reports", "counts", "checks", "bucket_counts", "per_report")): | |
| schema_bonus += 12 | |
| if "skill" in obj: | |
| schema_bonus += 8 | |
| if "confidence_distribution" in obj: | |
| schema_bonus += 6 | |
| return (schema_bonus, consumed_len) | |
| candidates.sort(key=_score, reverse=True) | |
| return candidates[0][0] | |
| def _format_answer(answer: str) -> str: | |
| if not answer: | |
| return "" | |
| parsed = _coerce_payload_for_ui(_extract_json_payload(answer)) | |
| if isinstance(parsed, dict): | |
| low_keys = {str(k).strip().lower() for k in parsed.keys()} | |
| if low_keys and low_keys.issubset({"high", "medium", "low"}): | |
| conf = _normalize_confidence_distribution(parsed) | |
| if conf: | |
| cleaned = re.sub( | |
| r"confidence\s*distribution\s*:\s*\{[^{}]+\}", | |
| "", | |
| str(answer), | |
| flags=re.IGNORECASE, | |
| ).strip() | |
| body = _escape(cleaned).replace("\n", "<br>") | |
| return f"<div>{body}</div>{_render_confidence_distribution(conf)}" | |
| if parsed is None: | |
| conf, cleaned = _extract_confidence_distribution_from_text(answer) | |
| conf_html = _render_confidence_distribution(conf) | |
| if conf_html: | |
| body = _escape(cleaned).replace("\n", "<br>") | |
| return f"<div>{body}</div>{conf_html}" | |
| return answer | |
| if not isinstance(parsed, dict): | |
| return f"```json\n{json.dumps(parsed, ensure_ascii=False, indent=2)}\n```" | |
| skill_html = _render_skill_specific_html(parsed) | |
| if skill_html: | |
| return skill_html | |
| parts = [] | |
| dimension = parsed.get("dimension") | |
| if dimension: | |
| parts.append(f"**Dimension:** {_md_cell(dimension)}") | |
| rows = parsed.get("rows") | |
| if isinstance(rows, list) and rows: | |
| table_rows = [] | |
| for item in rows: | |
| if not isinstance(item, dict): | |
| continue | |
| table_rows.append({ | |
| "Report": item.get("report", ""), | |
| "Year": item.get("year", ""), | |
| "Status": item.get("disclosure_status", ""), | |
| "Key Points": len(item.get("key_points") or []), | |
| "Evidence": len(item.get("evidence_chunks") or []), | |
| }) | |
| if table_rows: | |
| parts.append("### Comparison") | |
| parts.append(_md_table(table_rows, ["Report", "Year", "Status", "Key Points", "Evidence"])) | |
| ranking = parsed.get("ranking") | |
| if isinstance(ranking, list) and ranking: | |
| ranking_rows = [] | |
| for item in ranking: | |
| if not isinstance(item, dict): | |
| continue | |
| ranking_rows.append({ | |
| "Rank": item.get("rank", ""), | |
| "Report": item.get("report", ""), | |
| "Rationale": _truncate(item.get("rationale", "")), | |
| }) | |
| if ranking_rows: | |
| parts.append("### Ranking") | |
| parts.append(_md_table(ranking_rows, ["Rank", "Report", "Rationale"])) | |
| conclusion = parsed.get("conclusion") | |
| if isinstance(conclusion, str) and conclusion.strip(): | |
| parts.append("### Conclusion") | |
| parts.append(_md_cell(conclusion)) | |
| # Generic JSON-answer schema fallback. | |
| if "answer" in parsed or "explanation" in parsed or "evidence_ids" in parsed: | |
| ans = parsed.get("answer", "") | |
| exp = parsed.get("explanation", "") | |
| if ans: | |
| parts.append("### Answer") | |
| parts.append(_md_cell(ans)) | |
| if exp: | |
| parts.append("### Explanation") | |
| parts.append(_md_cell(exp)) | |
| ev_ids = parsed.get("evidence_ids") | |
| if isinstance(ev_ids, list) and ev_ids: | |
| parts.append(f"### Retrieved Sources Count\n{len(ev_ids)}") | |
| # Show remaining scalar fields in a compact table. | |
| skip_keys = {"answer", "explanation", "evidence_ids", "dimension", "rows", "ranking"} | |
| extra_rows = [] | |
| for k, v in parsed.items(): | |
| if k in skip_keys: | |
| continue | |
| if isinstance(v, (str, int, float, bool)) or v is None: | |
| extra_rows.append({"Field": k, "Value": _md_cell(v)}) | |
| if extra_rows: | |
| parts.append("### Extra Fields") | |
| parts.append(_md_table(extra_rows, ["Field", "Value"])) | |
| conf_html = _render_confidence_distribution(parsed.get("confidence_distribution")) | |
| if conf_html: | |
| parts.append(conf_html) | |
| if parts: | |
| return "\n\n".join(parts) | |
| return f"```json\n{json.dumps(parsed, ensure_ascii=False, indent=2)}\n```" | |
| # ======================== Handlers ======================== | |
| def on_doc_mode_change(doc_mode): | |
| if doc_mode == "Single-document": | |
| return ( | |
| gr.update(placeholder=PLACEHOLDER_SINGLE, value=""), | |
| gr.update( | |
| value=( | |
| '<p class="hint-text">' | |
| '\U0001f4a1 Tip: We recommend prefixing your question with the report name, ' | |
| 'e.g. <i>"For [Report Name], does the company ...?"</i>' | |
| '</p>' | |
| ), | |
| visible=True, | |
| ), | |
| ) | |
| return ( | |
| gr.update(placeholder=PLACEHOLDER_MULTI, value=""), | |
| gr.update( | |
| value=( | |
| '<p class="hint-text">' | |
| '\U0001f4a1 Tip: We recommend prefixing your question with the report name, ' | |
| 'e.g. <i>"For [Report 1 Name] and [Report 2 Name], does ...?"</i>' | |
| '</p>' | |
| ), | |
| visible=True, | |
| ), | |
| ) | |
| def on_model_selection_change(gen_model, embed_model): | |
| use_api_gen = "(API)" in str(gen_model) | |
| use_gemini_gen = use_api_gen and _is_gemini_generation_model(gen_model) | |
| needs_openai_key = (str(embed_model) in OPENAI_EMBED_MODELS_SET) or (use_api_gen and not use_gemini_gen) | |
| needs_gemini_key = use_gemini_gen | |
| return ( | |
| gr.update(visible=needs_openai_key), | |
| gr.update(visible=needs_gemini_key), | |
| ) | |
| def on_report_select(report_name): | |
| if not report_name: | |
| return ( | |
| "<p>No report selected.</p>", | |
| 1, | |
| 1, | |
| "Page: 1 / 1", | |
| gr.update(interactive=False), | |
| gr.update(interactive=False), | |
| ) | |
| total = _get_pdf_total_pages(report_name) | |
| return ( | |
| _pdf_iframe(report_name, page=1), | |
| 1, | |
| total, | |
| f"Page: 1 / {total}", | |
| gr.update(interactive=False), | |
| gr.update(interactive=total > 1), | |
| ) | |
| def on_prev_page(report_name, current_page, total_pages): | |
| if not report_name: | |
| return ( | |
| "<p>No report selected.</p>", | |
| 1, | |
| 1, | |
| "Page: 1 / 1", | |
| gr.update(interactive=False), | |
| gr.update(interactive=False), | |
| ) | |
| total = max(1, int(total_pages or 1)) | |
| page = max(1, int(current_page or 1) - 1) | |
| return ( | |
| _pdf_iframe(report_name, page=page), | |
| page, | |
| total, | |
| f"Page: {page} / {total}", | |
| gr.update(interactive=page > 1), | |
| gr.update(interactive=page < total), | |
| ) | |
| def on_next_page(report_name, current_page, total_pages): | |
| if not report_name: | |
| return ( | |
| "<p>No report selected.</p>", | |
| 1, | |
| 1, | |
| "Page: 1 / 1", | |
| gr.update(interactive=False), | |
| gr.update(interactive=False), | |
| ) | |
| total = max(1, int(total_pages or 1)) | |
| page = min(total, max(1, int(current_page or 1) + 1)) | |
| return ( | |
| _pdf_iframe(report_name, page=page), | |
| page, | |
| total, | |
| f"Page: {page} / {total}", | |
| gr.update(interactive=page > 1), | |
| gr.update(interactive=page < total), | |
| ) | |
| def on_run_start(): | |
| return "## Waiting......", "", _render_waiting("Waiting......") | |
| def _has_openai_api_key(local_api_key: str) -> bool: | |
| if str(local_api_key or "").strip(): | |
| return True | |
| if os.getenv("OPENAI_API_KEY", "").strip(): | |
| return True | |
| if os.getenv("OPENAI_API_KEY_88996", "").strip(): | |
| return True | |
| return False | |
| def _has_gemini_api_key(local_api_key: str) -> bool: | |
| if str(local_api_key or "").strip(): | |
| return True | |
| if os.getenv("GEMINI_API_KEY", "").strip(): | |
| return True | |
| return False | |
| def do_query(question, doc_mode_label, rag_mode, embed_model, | |
| gen_model, openai_api_key, gemini_api_key, top_k): | |
| empty_btns = _default_claim_button_updates() | |
| empty_state_contexts = [] | |
| empty_state_trace = [] | |
| openai_key = str(openai_api_key or "").strip() | |
| gemini_key = str(gemini_api_key or "").strip() | |
| if not question or not question.strip(): | |
| yield "\u26a0\ufe0f Please enter a question.", "", "", "", "", empty_state_contexts, empty_state_trace, *empty_btns | |
| return | |
| if (not HAS_GPU) and ("(API)" not in str(gen_model)): | |
| msg = "\u26a0\ufe0f No GPU detected. Please use an API generation model." | |
| yield msg, "", msg, "", "", empty_state_contexts, empty_state_trace, *empty_btns | |
| return | |
| if (str(embed_model) in OPENAI_EMBED_MODELS_SET) and (not _has_openai_api_key(openai_key)): | |
| msg = ( | |
| "\u26a0\ufe0f OpenAI embedding model selected but API key is missing. " | |
| "Please input API key or set OPENAI_API_KEY." | |
| ) | |
| yield msg, "", msg, "", "", empty_state_contexts, empty_state_trace, *empty_btns | |
| return | |
| if "(API)" in str(gen_model): | |
| if _is_gemini_generation_model(gen_model): | |
| if not _has_gemini_api_key(gemini_key): | |
| msg = ( | |
| "\u26a0\ufe0f Gemini API generation model selected but API key is missing. " | |
| "Please input API key or set GEMINI_API_KEY." | |
| ) | |
| yield msg, "", msg, "", "", empty_state_contexts, empty_state_trace, *empty_btns | |
| return | |
| elif not _has_openai_api_key(openai_key): | |
| msg = ( | |
| "\u26a0\ufe0f OpenAI API generation model selected but API key is missing. " | |
| "Please input API key or set OPENAI_API_KEY." | |
| ) | |
| yield msg, "", msg, "", "", empty_state_contexts, empty_state_trace, *empty_btns | |
| return | |
| if openai_key: | |
| os.environ["OPENAI_API_KEY"] = openai_key | |
| if gemini_key: | |
| os.environ["GEMINI_API_KEY"] = gemini_key | |
| backend_api_key = openai_key | |
| if (not backend_api_key) and _is_gemini_generation_model(gen_model): | |
| backend_api_key = gemini_key | |
| doc_mode = "single" if doc_mode_label == "Single-document" else "multi" | |
| rag_mode = str(rag_mode or "ClimateRAG") | |
| q = question.strip() | |
| try: | |
| base_top_k = max(1, int(top_k)) | |
| except Exception: | |
| base_top_k = 5 | |
| t0 = time.perf_counter() | |
| if rag_mode != "ClimateRAG": | |
| answer, contexts = run_rag( | |
| question=q, | |
| chunk_mode="length", | |
| doc_mode=doc_mode, | |
| top_k=base_top_k, | |
| embed_name=embed_model, | |
| gen_model=gen_model, | |
| api_key=backend_api_key, | |
| ) | |
| elapsed = time.perf_counter() - t0 | |
| answer_md = _format_answer(answer) | |
| evidence_md = _format_evidence(contexts) | |
| status = f"\u2705 Baseline RAG complete: retrieved {len(contexts)} passages." | |
| timing_md = f"\u23f1\ufe0f **Elapsed:** `{elapsed:.2f}s`" | |
| pipeline_md = ( | |
| "## Baseline RAG\n" | |
| f"- Retrieved **{len(contexts)}** passages\n" | |
| "- Single-step retrieval + generation completed." | |
| ) | |
| yield answer_md, evidence_md, status, timing_md, pipeline_md, contexts, [], *empty_btns | |
| return | |
| answer_md = "*Waiting for STEP 2 answer...*" | |
| evidence_md = "*Waiting for retrieval...*" | |
| status = "⏳ ClimateRAG pipeline started." | |
| timing_md = "" | |
| pipeline_md = _render_waiting("STEP 1 Waiting......") | |
| yield answer_md, evidence_md, status, timing_md, pipeline_md, empty_state_contexts, empty_state_trace, *empty_btns | |
| try: | |
| # ---------- Step 1: retrieval + clustering ---------- | |
| step1 = run_trustworthy_step1( | |
| question=q, | |
| doc_mode=doc_mode, | |
| top_k=base_top_k, | |
| embed_name=embed_model, | |
| gen_model=gen_model, | |
| api_key=backend_api_key, | |
| ) | |
| step1_md = _render_step1_clusters_md(step1) | |
| evidence_md = _format_evidence(step1.get("contexts", [])) | |
| status = "⏳ STEP 1 completed. Running STEP 2..." | |
| pipeline_md = step1_md + "\n\n---\n\n" + _render_waiting("STEP 2 Waiting......") | |
| yield answer_md, evidence_md, status, "", pipeline_md, step1.get("contexts", []), empty_state_trace, *empty_btns | |
| # ---------- Step 2: answer generation ---------- | |
| step2 = run_trustworthy_step2( | |
| question=q, | |
| doc_mode=doc_mode, | |
| contexts=step1.get("contexts", []), | |
| clusters=step1.get("clusters", []), | |
| gen_model=gen_model, | |
| api_key=backend_api_key, | |
| ) | |
| answer_md = _format_answer(step2.get("answer", "")) | |
| step2_md = _render_step2_claims_md(step2) | |
| status = "⏳ STEP 2 completed. Running STEP 3..." | |
| pipeline_md = step1_md + "\n\n---\n\n" + step2_md + "\n\n---\n\n" + _render_waiting("STEP 3 Waiting......") | |
| yield answer_md, evidence_md, status, "", pipeline_md, step1.get("contexts", []), empty_state_trace, *empty_btns | |
| # ---------- Step 3: claim extractor ---------- | |
| step3 = run_trustworthy_step3_claims( | |
| question=q, | |
| answer=step2.get("answer", ""), | |
| contexts=step1.get("contexts", []), | |
| doc_mode=doc_mode, | |
| gen_model=gen_model, | |
| api_key=backend_api_key, | |
| ) | |
| step3_md = _render_step3_md(step3) | |
| step2_summary_md = _render_step2_summary_md(step2) | |
| final_trace = _prepare_claim_trace(step3) | |
| btn_updates = _claim_button_updates(final_trace) | |
| elapsed = time.perf_counter() - t0 | |
| status = ( | |
| "\u2705 ClimateRAG pipeline completed: " | |
| f"{len(step1.get('contexts', []))} passages, {len(step3.get('claims', []))} claims extracted." | |
| ) | |
| timing_md = f"\u23f1\ufe0f **Elapsed:** `{elapsed:.2f}s`" | |
| pipeline_md = step1_md + "\n\n---\n\n" + step2_summary_md + "\n\n---\n\n" + step3_md | |
| yield answer_md, evidence_md, status, timing_md, pipeline_md, step1.get("contexts", []), final_trace, *btn_updates | |
| return | |
| except Exception as e: | |
| elapsed = time.perf_counter() - t0 | |
| err = f"\u26a0\ufe0f ClimateRAG pipeline failed: {e}" | |
| timing_md = f"\u23f1\ufe0f **Elapsed before failure:** `{elapsed:.2f}s`" | |
| yield "*ClimateRAG pipeline failed.*", "", err, timing_md, f"{_render_waiting('Waiting......')}\n\n{err}", empty_state_contexts, empty_state_trace, *empty_btns | |
| return | |
| def build_report_name_list(): | |
| """Build report name list without requiring local PDF files.""" | |
| reports = list_reports() | |
| names = sorted({str(r.get("name", "")).strip() for r in reports if isinstance(r, dict) and str(r.get("name", "")).strip()}) | |
| if names: | |
| return names | |
| # Fallback to chunk JSON source names when Reports/ is removed. | |
| try: | |
| chunks = get_report_chunks("structure") | |
| names = sorted([str(x).strip() for x in chunks.keys() if str(x).strip()]) | |
| if names: | |
| return names | |
| except Exception: | |
| pass | |
| try: | |
| chunks = get_report_chunks("length") | |
| names = sorted([str(x).strip() for x in chunks.keys() if str(x).strip()]) | |
| if names: | |
| return names | |
| except Exception: | |
| pass | |
| return [] | |
| def render_report_names_md(names): | |
| if not names: | |
| return "_No report names found from local PDFs or chunk JSON sources._" | |
| lines = [f"### Report Names ({len(names)})", ""] | |
| lines.extend([f"- `{n}`" for n in names]) | |
| return "\n".join(lines) | |
| # ======================== CSS ======================== | |
| CUSTOM_CSS = """ | |
| :root { | |
| --font: "Segoe UI", Roboto, Helvetica, Arial, sans-serif !important; | |
| } | |
| html, body, button, input, textarea, select { | |
| font-family: "Segoe UI", Roboto, Helvetica, Arial, sans-serif !important; | |
| -webkit-font-smoothing: antialiased; | |
| -moz-osx-font-smoothing: grayscale; | |
| } | |
| /* Hide image toolbar buttons */ | |
| .gradio-image button, | |
| .gradio-image .absolute { | |
| display: none !important; | |
| } | |
| footer { display: none !important; } | |
| .built-with { display: none !important; } | |
| .hint-text { | |
| color: #666; | |
| font-size: 0.95em; | |
| margin-top: 2px; | |
| margin-bottom: 8px; | |
| width: 100%; | |
| max-width: none; | |
| line-height: 1.45; | |
| white-space: normal; | |
| } | |
| .logo-header { | |
| display: flex; | |
| align-items: center; | |
| justify-content: center; | |
| padding: 20px 16px 12px 16px; | |
| background: linear-gradient(180deg, #f0f7ff 0%, #ffffff 100%); | |
| border-radius: 12px; | |
| margin-bottom: 8px; | |
| min-height: 124px; | |
| } | |
| .logo-header-text { | |
| text-align: center; | |
| width: 100%; | |
| max-width: 900px; | |
| } | |
| .logo-header h2 { | |
| margin: 0 0 2px 0; | |
| color: #1a5276; | |
| font-size: 1.45em; | |
| letter-spacing: 0.02em; | |
| } | |
| .logo-header p { | |
| color: #666; | |
| font-size: 0.95em; | |
| margin: 0; | |
| } | |
| .mock-banner { | |
| background: #fff3cd; | |
| border: 1px solid #ffc107; | |
| border-radius: 8px; | |
| padding: 12px 20px; | |
| margin: 0 0 12px 0; | |
| color: #856404; | |
| font-size: 0.92em; | |
| } | |
| .custom-footer { | |
| text-align: center; | |
| padding: 14px 0; | |
| color: #bbb; | |
| font-size: 0.83em; | |
| border-top: 1px solid #eee; | |
| margin-top: 20px; | |
| } | |
| .waiting-banner { | |
| font-size: 2rem; | |
| font-weight: 700; | |
| color: #d35400; | |
| text-align: center; | |
| } | |
| .maturity-card { | |
| border: 1px solid #dbe7f3; | |
| border-radius: 12px; | |
| padding: 14px; | |
| background: linear-gradient(180deg, #f8fbff 0%, #ffffff 100%); | |
| } | |
| .maturity-card h3 { | |
| margin: 0 0 12px 0; | |
| color: #154360; | |
| } | |
| .maturity-card h4 { | |
| margin: 10px 0 6px 0; | |
| color: #1b4f72; | |
| } | |
| .maturity-table-wrap { | |
| overflow-x: auto; | |
| } | |
| .maturity-table { | |
| width: 100%; | |
| border-collapse: collapse; | |
| font-size: 0.95rem; | |
| } | |
| .maturity-table th, .maturity-table td { | |
| border: 1px solid #d6e4f0; | |
| padding: 10px; | |
| vertical-align: top; | |
| text-align: left; | |
| line-height: 1.45; | |
| } | |
| .maturity-table thead th { | |
| background: #eaf3ff; | |
| } | |
| .maturity-table tbody tr:nth-child(even) { | |
| background: #fbfdff; | |
| } | |
| .maturity-badge { | |
| display: inline-block; | |
| padding: 2px 10px; | |
| border-radius: 999px; | |
| font-weight: 600; | |
| font-size: 0.86rem; | |
| } | |
| .maturity-badge.level-high { background: #e8f8f0; color: #117864; } | |
| .maturity-badge.level-moderate { background: #fff4e5; color: #9c640c; } | |
| .maturity-badge.level-low { background: #fdecea; color: #922b21; } | |
| .maturity-badge.level-insufficient { background: #fdecea; color: #922b21; } | |
| .maturity-badge.level-unknown { background: #eef2f7; color: #34495e; } | |
| .maturity-list { | |
| margin: 0; | |
| padding-left: 18px; | |
| } | |
| .maturity-list li { | |
| margin: 0 0 6px 0; | |
| } | |
| .muted { color: #9aa5b1; } | |
| .maturity-conclusion { | |
| margin-top: 12px; | |
| border-top: 1px dashed #cad9e8; | |
| padding-top: 10px; | |
| } | |
| .maturity-conclusion h4 { | |
| margin: 0 0 6px 0; | |
| color: #1b4f72; | |
| } | |
| .maturity-conclusion p { | |
| margin: 0; | |
| } | |
| .maturity-note { | |
| margin-top: 10px; | |
| padding: 8px 10px; | |
| border: 1px dashed #cad9e8; | |
| border-radius: 8px; | |
| color: #516274; | |
| background: #f8fbff; | |
| } | |
| .metric-chip-wrap { | |
| margin-top: 10px; | |
| } | |
| .metric-chip { | |
| display: inline-block; | |
| margin: 4px 6px 0 0; | |
| padding: 4px 10px; | |
| border-radius: 999px; | |
| border: 1px solid #c8ddf2; | |
| background: #edf5ff; | |
| color: #1b4f72; | |
| font-size: 0.84rem; | |
| } | |
| .metric-chip.trend-up { background: #e8f8f0; border-color: #bfe8d3; color: #117864; } | |
| .metric-chip.trend-down { background: #fdecea; border-color: #f3c6c2; color: #922b21; } | |
| .metric-chip.trend-flat { background: #fff4e5; border-color: #f2ddba; color: #9c640c; } | |
| .metric-chip.trend-unknown { background: #eef2f7; border-color: #d6dce3; color: #5d6d7e; } | |
| .maturity-split { | |
| display: grid; | |
| grid-template-columns: 1fr 1fr; | |
| gap: 12px; | |
| margin-top: 10px; | |
| } | |
| .confidence-card { | |
| margin-top: 12px; | |
| border: 1px solid #dbe7f3; | |
| border-radius: 10px; | |
| padding: 10px 12px; | |
| background: #f8fbff; | |
| } | |
| .confidence-card h4 { | |
| margin: 0 0 8px 0; | |
| color: #1b4f72; | |
| } | |
| .conf-row { | |
| display: grid; | |
| grid-template-columns: 70px 1fr 60px; | |
| gap: 8px; | |
| align-items: center; | |
| margin: 6px 0; | |
| } | |
| .conf-label { | |
| font-weight: 600; | |
| color: #34495e; | |
| } | |
| .conf-bar { | |
| height: 10px; | |
| border-radius: 999px; | |
| background: #e9eff6; | |
| overflow: hidden; | |
| } | |
| .conf-fill { | |
| height: 100%; | |
| border-radius: 999px; | |
| } | |
| .conf-fill.conf-high { background: #27ae60; } | |
| .conf-fill.conf-medium { background: #f39c12; } | |
| .conf-fill.conf-low { background: #e74c3c; } | |
| .conf-value { | |
| text-align: right; | |
| color: #566573; | |
| font-variant-numeric: tabular-nums; | |
| } | |
| @media (max-width: 900px) { | |
| .logo-header { | |
| min-height: auto; | |
| padding-top: 12px; | |
| padding-bottom: 12px; | |
| flex-direction: column; | |
| } | |
| .maturity-split { | |
| grid-template-columns: 1fr; | |
| } | |
| } | |
| """ | |
| # ======================== Gradio UI ======================== | |
| with gr.Blocks( | |
| title="Climate Disclosure RAG" | |
| ) as demo: | |
| # ---------- Header ---------- | |
| gr.HTML( | |
| f""" | |
| <div class="logo-header"> | |
| <div class="logo-header-text"> | |
| <h2>Climate Disclosure RAG</h2> | |
| <p>AI-powered analysis of corporate sustainability & climate disclosures</p> | |
| </div> | |
| </div> | |
| """ | |
| ) | |
| # ==================== Tab 1: Question Answering ==================== | |
| with gr.Tab("\U0001f50d Question Answering"): | |
| gr.Markdown("### \U0001f4ac Ask a Question About Sustainability Reports") | |
| with gr.Row(): | |
| doc_mode_radio = gr.Radio( | |
| choices=["Multi-document", "Single-document"], | |
| value="Multi-document", | |
| label="Question Type", | |
| info="Single: ask about one report | Multi: compare across reports", | |
| ) | |
| single_hint = gr.Markdown( | |
| '<p class="hint-text">' | |
| '\U0001f4a1 Tip: We recommend prefixing your question with the report name, ' | |
| 'e.g. <i>"For [Report 1 Name] and [Report 2 Name], does ...?"</i>' | |
| '</p>', | |
| visible=True, | |
| ) | |
| question_box = gr.Textbox( | |
| label="Your Question", | |
| placeholder=PLACEHOLDER_MULTI, | |
| lines=3, | |
| max_lines=6, | |
| info='Please click "Use Example Question" to use the recommended question.', | |
| ) | |
| use_example_btn = gr.Button("Use Example Question", variant="primary") | |
| gr.Markdown("#### \u2699\ufe0f Model Configuration") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| rag_mode_dd = gr.Dropdown( | |
| choices=["ClimateRAG", "Baseline RAG"], | |
| value="ClimateRAG", | |
| label="RAG Mode", | |
| ) | |
| with gr.Column(scale=1): | |
| embed_model_dd = gr.Dropdown( | |
| choices=EMBED_MODELS, | |
| value=EMBED_MODELS[0], | |
| label="\U0001f9e0 Embedding Model", | |
| ) | |
| with gr.Column(scale=1): | |
| gen_model_dd = gr.Dropdown( | |
| choices=GEN_MODELS, | |
| value=(GEN_MODELS[0] if HAS_GPU else API_GEN_MODEL), | |
| label="\U0001f916 Generation Model", | |
| ) | |
| if not HAS_GPU: | |
| gr.Markdown( | |
| "<span style='color:#8a8a8a;'>GPU not detected: local generation models are disabled. " | |
| "Only API generation models are available.</span>" | |
| ) | |
| gr.Markdown( | |
| "<span style='color:#b0b0b0;'>Disabled (GPU-only): " | |
| + ", ".join(GPU_GEN_MODELS) | |
| + "</span>" | |
| ) | |
| default_gen_model = GEN_MODELS[0] if HAS_GPU else API_GEN_MODEL | |
| default_embed_model = EMBED_MODELS[0] | |
| default_need_openai_key = ( | |
| (default_embed_model in OPENAI_EMBED_MODELS_SET) | |
| or (("(API)" in str(default_gen_model)) and (not _is_gemini_generation_model(default_gen_model))) | |
| ) | |
| default_need_gemini_key = ("(API)" in str(default_gen_model)) and _is_gemini_generation_model(default_gen_model) | |
| openai_api_key_box = gr.Textbox( | |
| label="\U0001f511 OpenAI API Key", | |
| type="password", | |
| placeholder="sk-...", | |
| value=DEFAULT_OPENAI_API_KEY, | |
| visible=default_need_openai_key, | |
| info="Required for OpenAI embedding models and OpenAI API generation models.", | |
| ) | |
| gemini_api_key_box = gr.Textbox( | |
| label="\U0001f511 Gemini API Key", | |
| type="password", | |
| placeholder="AIza...", | |
| value=DEFAULT_GEMINI_API_KEY, | |
| visible=default_need_gemini_key, | |
| info="Required for Gemini API generation models.", | |
| ) | |
| top_k_slider = gr.Slider( | |
| minimum=1, maximum=20, value=5, step=1, | |
| label="\U0001f3af Top-K Retrieved Passages", | |
| ) | |
| submit_btn = gr.Button("\U0001f680 Run Analysis", variant="primary", size="lg") | |
| status_md = gr.Markdown("") | |
| timing_md = gr.Markdown("") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("#### ClimateRAG Pipeline") | |
| pipeline_md = gr.Markdown( | |
| value="*Three-step ClimateRAG pipeline output will appear here after Run.*", | |
| sanitize_html=False, | |
| ) | |
| gr.Markdown("#### Generated Answer") | |
| answer_box = gr.Markdown( | |
| value="*Answer will appear here after you click Run.*", | |
| sanitize_html=False, | |
| ) | |
| gr.Markdown("#### Claim Trace (Click to Highlight Evidence)") | |
| with gr.Row(): | |
| claim_btn_1 = gr.Button("Claim 1", visible=False) | |
| claim_btn_2 = gr.Button("Claim 2", visible=False) | |
| claim_btn_3 = gr.Button("Claim 3", visible=False) | |
| clear_highlight_btn = gr.Button("Clear Highlight", visible=True) | |
| with gr.Column(scale=1): | |
| gr.Markdown("#### Retrieved Evidence") | |
| evidence_box = gr.Markdown( | |
| value="*Results will appear here after you click Run.*", | |
| sanitize_html=False, | |
| ) | |
| contexts_state = gr.State([]) | |
| claim_trace_state = gr.State([]) | |
| # ---- Wiring ---- | |
| doc_mode_radio.change( | |
| fn=on_doc_mode_change, | |
| inputs=[doc_mode_radio], | |
| outputs=[question_box, single_hint], | |
| ) | |
| use_example_btn.click( | |
| fn=lambda mode: PLACEHOLDER_SINGLE if mode == "Single-document" else PLACEHOLDER_MULTI, | |
| inputs=[doc_mode_radio], | |
| outputs=[question_box], | |
| ) | |
| gen_model_dd.change( | |
| fn=on_model_selection_change, | |
| inputs=[gen_model_dd, embed_model_dd], | |
| outputs=[openai_api_key_box, gemini_api_key_box], | |
| queue=False, | |
| ) | |
| embed_model_dd.change( | |
| fn=on_model_selection_change, | |
| inputs=[gen_model_dd, embed_model_dd], | |
| outputs=[openai_api_key_box, gemini_api_key_box], | |
| queue=False, | |
| ) | |
| demo.load( | |
| fn=on_model_selection_change, | |
| inputs=[gen_model_dd, embed_model_dd], | |
| outputs=[openai_api_key_box, gemini_api_key_box], | |
| queue=False, | |
| ) | |
| submit_btn.click( | |
| fn=on_run_start, | |
| outputs=[status_md, timing_md, pipeline_md], | |
| queue=False, | |
| ).then( | |
| fn=do_query, | |
| inputs=[ | |
| question_box, doc_mode_radio, rag_mode_dd, | |
| embed_model_dd, gen_model_dd, | |
| openai_api_key_box, gemini_api_key_box, top_k_slider, | |
| ], | |
| outputs=[ | |
| answer_box, evidence_box, status_md, timing_md, pipeline_md, | |
| contexts_state, claim_trace_state, | |
| claim_btn_1, claim_btn_2, claim_btn_3, | |
| ], | |
| ) | |
| claim_btn_1.click( | |
| fn=lambda ctx, trace: on_claim_click(0, ctx, trace), | |
| inputs=[contexts_state, claim_trace_state], | |
| outputs=[evidence_box], | |
| queue=False, | |
| ) | |
| claim_btn_2.click( | |
| fn=lambda ctx, trace: on_claim_click(1, ctx, trace), | |
| inputs=[contexts_state, claim_trace_state], | |
| outputs=[evidence_box], | |
| queue=False, | |
| ) | |
| claim_btn_3.click( | |
| fn=lambda ctx, trace: on_claim_click(2, ctx, trace), | |
| inputs=[contexts_state, claim_trace_state], | |
| outputs=[evidence_box], | |
| queue=False, | |
| ) | |
| clear_highlight_btn.click( | |
| fn=clear_claim_highlight, | |
| inputs=[contexts_state], | |
| outputs=[evidence_box], | |
| queue=False, | |
| ) | |
| # ---- Tab 2: Document Library ---- | |
| with gr.Tab("\U0001f4da Document Library"): | |
| gr.Markdown( | |
| "### Sustainability Report Collection\n" | |
| "Direct PDF download is disabled in this Space. " | |
| "Use the official GitHub link to access report files." | |
| ) | |
| gr.Markdown(f"Report download link: [ClimRetrieve Reports]({REPORTS_GITHUB_URL})") | |
| report_names = build_report_name_list() | |
| gr.Markdown(render_report_names_md(report_names)) | |
| # ==================== Tab 3: About ==================== | |
| with gr.Tab("ℹ️ About"): | |
| gr.Markdown(""" | |
| ### ClimateRAG — Climate Disclosure Retrieval-Augmented Generation for Evidence-based Question-Answering | |
| Increasingly stringent global regulations require companies to provide detailed and auditable climate-related disclosures. These reports are often lengthy and visually complex, making manual analysis challenging for regulators and auditors who require precise evidence grounding rather than free-form answers. | |
| ClimateRAG is a structured processing and reasoning framework designed for automated climate disclosure analysis. The system integrates hierarchical document chunking, an agent-based reasoning pipeline, and a claim extractor module to produce traceable, evidence-linked, and auditable outputs. It supports both single-document and multi-document analysis scenarios. | |
| We additionally introduce a dataset of 367 expert-annotated question–answer pairs covering realistic regulatory and audit workflows. Experimental evaluation demonstrates the effectiveness and efficiency of the proposed framework for climate disclosure analysis. | |
| The goal of ClimateRAG is to bridge Large Language Models with the rigorous standards required in regulatory auditing and sustainability reporting. | |
| --- | |
| ### Key Contributions | |
| 1. We develop ClimateRAG, the first system specifically designed for auditable and evidence-linked climate disclosure analysis with multi-document reasoning capability. | |
| 2. We construct a dataset of 367 annotated QA pairs spanning single-document and cross-document settings, aligned with real-world regulatory and auditing scenarios. | |
| 3. We conduct systematic evaluation to assess both retrieval and generation performance, validating the robustness and practical utility of the system. | |
| --- | |
| ### Project Website | |
| https://cheng-tf.github.io/ClimateRAG/ | |
| """) | |
| # ---------- Custom Footer ---------- | |
| gr.HTML( | |
| '<div class="custom-footer">' | |
| 'Built with Gradio \u00b7 Powered by Climate Disclosure RAG \u00b7 \u00a9 2026' | |
| '</div>' | |
| ) | |
| # ======================== Launch ======================== | |
| if __name__ == "__main__": | |
| server_name = os.getenv("APP_HOST", "0.0.0.0") | |
| server_port = int(os.getenv("APP_PORT", "7860")) | |
| root_path = os.getenv("APP_ROOT_PATH", "") | |
| share = os.getenv("APP_SHARE", "false").lower() in {"1", "true", "yes", "y"} | |
| allowed_paths = [ | |
| p for p in [REPORTS_DIR, SCRIPT_DIR] | |
| if isinstance(p, str) and os.path.exists(p) | |
| ] | |
| launch_kwargs = dict( | |
| server_name=server_name, | |
| server_port=server_port, | |
| share=share, | |
| show_error=True, | |
| root_path=root_path if root_path else None, | |
| css=CUSTOM_CSS, | |
| theme=gr.themes.Soft(primary_hue="blue", secondary_hue="slate"), | |
| ) | |
| if allowed_paths: | |
| launch_kwargs["allowed_paths"] = allowed_paths | |
| demo.launch(**launch_kwargs) | |