tengfeiCheng's picture
update
03cb531
"""
Climate Disclosure RAG
==================================
Gradio Web Interface for sustainability report QA.
Launch:
python app.py
"""
import os
import sys
import json
import time
import re
import html
from urllib.parse import quote
import gradio_client.utils as _gcu
_orig_json_schema_fn = _gcu._json_schema_to_python_type
def _safe_json_schema_to_python_type(schema, defs=None):
if isinstance(schema, bool):
return "Any"
return _orig_json_schema_fn(schema, defs)
_gcu._json_schema_to_python_type = _safe_json_schema_to_python_type
import gradio as gr
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
if SCRIPT_DIR not in sys.path:
sys.path.insert(0, SCRIPT_DIR)
from rag_app_backend import (
run_rag,
run_trustworthy_step1,
run_trustworthy_step2,
run_trustworthy_step3_claims,
API_GEN_MODEL_ALIASES,
list_reports,
HAS_GPU,
OPENAI_EMBED_MODELS,
MOCK_MODE,
REPORTS_DIR,
get_report_chunks,
)
# ======================== Constants ========================
PLACEHOLDER_SINGLE = (
"For 2022 Microsoft Environmental Sustainability Report, "
"do the environmental/sustainability targets set by the company "
"reference external climate change adaptation goals/targets?"
)
PLACEHOLDER_MULTI = (
'For "Does the company encourage downstream partners to carry out climate-related '
'risk assessments?", is Boeing 2023 Sustainability Report better than '
'AT&T 2022 Sustainability Summary in disclosure quality?'
)
REPORTS_GITHUB_URL = "https://github.com/tobischimanski/ClimRetrieve/tree/main/Reports"
CPU_EMBED_MODELS = [
"BM25",
"text-embedding-3-large",
"text-embedding-3-small",
"text-embedding-ada-002",
]
GPU_EMBED_MODELS = [
"Qwen3-Embedding-0.6B",
"Qwen3-Embedding-4B",
]
EMBED_MODELS = (CPU_EMBED_MODELS + GPU_EMBED_MODELS) if HAS_GPU else CPU_EMBED_MODELS
GPU_GEN_MODELS = [
"Qwen3-4B-Instruct-2507-FP8",
]
API_GEN_MODELS = list(API_GEN_MODEL_ALIASES.keys())
API_GEN_MODEL = API_GEN_MODELS[0] if API_GEN_MODELS else "GPT-5-mini (API)"
GEN_MODELS = GPU_GEN_MODELS + API_GEN_MODELS if HAS_GPU else API_GEN_MODELS
OPENAI_EMBED_MODELS_SET = set(OPENAI_EMBED_MODELS)
DEFAULT_OPENAI_API_KEY = (
os.getenv("OPENAI_API_KEY", "").strip()
or os.getenv("OPENAI_API_KEY_88996", "").strip()
)
DEFAULT_GEMINI_API_KEY = os.getenv("GEMINI_API_KEY", "").strip()
def _is_gemini_generation_model(gen_model: str) -> bool:
text = str(gen_model or "").strip().upper()
return "GEMINI" in text
# ======================== Helpers ========================
_pdf_page_count_cache = {}
def _get_pdf_total_pages(report_name: str) -> int:
if not report_name:
return 1
if report_name in _pdf_page_count_cache:
return _pdf_page_count_cache[report_name]
pdf_path = os.path.join(REPORTS_DIR, report_name)
total = 1
try:
try:
from pypdf import PdfReader
except Exception:
from PyPDF2 import PdfReader
reader = PdfReader(pdf_path)
total = max(1, len(reader.pages))
except Exception:
total = 1
_pdf_page_count_cache[report_name] = total
return total
def _pdf_iframe(report_name: str, page: int = 1) -> str:
pdf_path = os.path.abspath(os.path.join(REPORTS_DIR, report_name)).replace("\\", "/")
pdf_url = f"/file={quote(pdf_path, safe='/:')}#page={max(1, int(page))}&view=FitH"
return (
f'<iframe src="{pdf_url}" '
f'width="100%" height="720" '
f'style="border:1px solid #ddd; border-radius:8px;" '
f'type="application/pdf">'
f'<p>PDF preview is not supported in this browser. Please download the file.</p>'
f'</iframe>'
)
def _format_evidence(contexts, highlight_keys=None, highlight_color: str = "#ff7043"):
highlight_set = set(highlight_keys or [])
medals = {0: "\U0001F947", 1: "\U0001F948", 2: "\U0001F949"}
parts = []
for i, c in enumerate(contexts):
badge = medals.get(i, f"#{i+1}")
report_short = c["report"].replace(".pdf", "")
score = c["score"]
page = c.get("page", None)
key = (str(c.get("report", "")), str(c.get("chunk_idx", "")))
text_body = str(c.get("text", ""))[:800]
if key in highlight_set:
safe_text = html.escape(text_body)
text_body = (
f"<div style=\"background:{highlight_color}33;border-left:6px solid {highlight_color};"
f"padding:8px 10px;border-radius:8px;\">{safe_text}</div>"
)
parts.append(
f"### {badge} {report_short}\n"
f"**Similarity:** {score:.4f} | "
f"**Chunk:** {c['chunk_idx']}"
+ (f", **Page:** {page}" if page not in (None, "", "NA") else "")
+ "\n\n"
f"{text_body}"
)
return "\n\n---\n\n".join(parts) if parts else "No evidence retrieved."
def _render_waiting(text: str) -> str:
return f"<div class='waiting-banner'>{text}</div>"
def _preview_chunk_text(text: str, max_sentences: int = 2, max_chars: int = 260) -> str:
raw = str(text or "").replace("\n", " ").replace("\r", " ").strip()
if not raw:
return ""
sentences = [s.strip() for s in re.split(r"(?<=[.!?])\s+", raw) if s.strip()]
if sentences:
preview = " ".join(sentences[:max_sentences]).strip()
else:
preview = raw[:max_chars].strip()
if len(preview) > max_chars:
preview = preview[:max_chars].rstrip()
if len(preview) < len(raw):
return preview + "..."
return preview
def _render_step1_clusters_md(step1: dict) -> str:
contexts = step1.get("contexts", []) if isinstance(step1, dict) else []
avg_sim = float(step1.get("average_similarity", 0.0) or 0.0) if isinstance(step1, dict) else 0.0
clusters = step1.get("clusters", []) if isinstance(step1, dict) else []
lines = [
"## STEP 1 - RETRIEVAL & EVIDENCE CLUSTERS",
f"- Retrieved **{len(contexts)}** paragraphs",
f"- Clustered into **{len(clusters)}** groups",
f"- Average similarity: **{avg_sim:.2f}**",
"- Note: Average similarity is the mean of retrieved Similarity scores.",
"",
]
if not clusters:
lines.append("_No clusters returned._")
return "\n".join(lines)
for i, c in enumerate(clusters):
cname = str(c.get("cluster_name", f"Cluster {i + 1}")).strip()
csum = str(c.get("summary", "")).strip()
lines.append(f"### Cluster {chr(65 + (i % 26))} - {cname}")
if csum:
lines.append(f"Summary: {csum}")
evs = c.get("evidence", [])
if isinstance(evs, list) and evs:
for ev in evs:
chunk_idx = ev.get("chunk_idx", "NA")
preview = _preview_chunk_text(str(ev.get("text", "")), max_sentences=2, max_chars=320)
lines.append(f"- Chunk {chunk_idx}: {preview}")
else:
lines.append("- No linked evidence items")
lines.append("")
return "\n".join(lines).strip()
def _render_step2_claims_md(step2: dict) -> str:
if not isinstance(step2, dict):
return "## STEP 2 - ANSWER GENERATION\n_No generation output._"
answer_ready = str(step2.get("answer", "")).strip()
lines = ["## STEP 2 - ANSWER GENERATION", ""]
if not answer_ready:
lines.append("- Warning: no answer returned from Step 2.")
return "\n".join(lines)
lines.append("### Generated Answer")
lines.append(_format_answer(answer_ready))
return "\n".join(lines)
def _render_step2_summary_md(step2: dict) -> str:
if not isinstance(step2, dict):
return "## STEP 2 - ANSWER GENERATION\n_No generation output._"
answer_ready = str(step2.get("answer", "")).strip()
if not answer_ready:
return "## STEP 2 - ANSWER GENERATION\n- Warning: no answer returned from Step 2."
return "\n".join(
[
"## STEP 2 - ANSWER GENERATION",
"- Generated answer is shown in the **Generated Answer** panel.",
]
)
def _render_step3_md(step3: dict) -> str:
return "\n".join(
[
"## STEP 3 - CLAIM EXTRACTOR",
"",
"Use the **Claim Trace** buttons below to inspect full claims and highlight linked evidence chunks.",
]
)
CLAIM_COLORS = ["#ff7043", "#ffd54f", "#4fc3f7"]
def _prepare_claim_trace(step3: dict):
claim_links = step3.get("claim_links", []) if isinstance(step3, dict) else []
if not isinstance(claim_links, list):
claim_links = []
trace = []
for row in claim_links[:3]:
if not isinstance(row, dict):
continue
claim_text = str(row.get("claim", "")).strip()
evs = row.get("evidence", [])
if not isinstance(evs, list):
evs = []
keys = []
chunk_refs = []
for ev in evs:
if not isinstance(ev, dict):
continue
report = str(ev.get("report", ""))
chunk_idx = str(ev.get("chunk_idx", ""))
keys.append((report, chunk_idx))
if report or chunk_idx:
report_short = report.replace(".pdf", "").strip() if report else ""
if report_short and chunk_idx:
chunk_refs.append(f"{report_short} chunk {chunk_idx}")
elif chunk_idx:
chunk_refs.append(f"chunk {chunk_idx}")
elif report_short:
chunk_refs.append(report_short)
trace.append(
{
"label": claim_text or "Claim",
"keys": keys,
"chunk_refs": chunk_refs,
"score": None,
}
)
return trace
def _default_claim_button_updates():
return [
gr.update(value="Claim 1", visible=False),
gr.update(value="Claim 2", visible=False),
gr.update(value="Claim 3", visible=False),
]
def _claim_button_updates(trace):
updates = []
for i in range(3):
if i < len(trace):
label = str(trace[i].get("label", f"Claim {i+1}")).strip()
score = trace[i].get("score", None)
prefix = ["🟧", "🟨", "🟦"][i]
refs = trace[i].get("chunk_refs", []) if isinstance(trace[i], dict) else []
ref_text = ", ".join(refs) if refs else "no chunk"
if score is None:
text = f"{prefix} Claim {i+1} [{ref_text}]: {label}"
else:
text = f"{prefix} Claim {i+1} ({score:.2f}) [{ref_text}]: {label}"
updates.append(gr.update(value=text, visible=True))
else:
updates.append(gr.update(value=f"Claim {i+1}", visible=False))
return updates
def on_claim_click(claim_idx, contexts_state, claim_trace_state):
contexts = contexts_state if isinstance(contexts_state, list) else []
trace = claim_trace_state if isinstance(claim_trace_state, list) else []
idx = int(claim_idx)
if idx < 0 or idx >= len(trace):
return _format_evidence(contexts)
keys = trace[idx].get("keys", []) if isinstance(trace[idx], dict) else []
color = CLAIM_COLORS[idx % len(CLAIM_COLORS)]
return _format_evidence(contexts, highlight_keys=keys, highlight_color=color)
def clear_claim_highlight(contexts_state):
contexts = contexts_state if isinstance(contexts_state, list) else []
return _format_evidence(contexts)
def _md_cell(value) -> str:
text = "" if value is None else str(value)
text = text.replace("\n", " ").replace("\r", " ").strip()
return text.replace("|", "\\|")
def _md_table(rows, columns) -> str:
if not rows:
return ""
header = "| " + " | ".join(columns) + " |"
sep = "| " + " | ".join(["---"] * len(columns)) + " |"
body = []
for row in rows:
body.append("| " + " | ".join(_md_cell(row.get(c, "")) for c in columns) + " |")
return "\n".join([header, sep] + body)
def _truncate(text: str, max_len: int = 180) -> str:
text = _md_cell(text)
return text if len(text) <= max_len else text[: max_len - 3] + "..."
def _shorten_text(text: str, max_len: int = 240) -> str:
text = "" if text is None else str(text)
text = text.replace("\n", " ").replace("\r", " ").strip()
return text if len(text) <= max_len else text[: max_len - 3] + "..."
def _pretty_company_name(raw: str) -> str:
if not raw:
return ""
name = raw.replace("_", " ").strip()
words = [w.upper() if w.lower() in {"esg"} else w.capitalize() for w in name.split()]
return " ".join(words)
def _escape(text) -> str:
return html.escape("" if text is None else str(text))
def _extract_number_snippets(texts, max_items: int = 3):
snippets = []
if not isinstance(texts, list):
return snippets
pattern = re.compile(
r"\b\d+(?:\.\d+)?\s?(?:%|percent|billion|million|thousand|mt|tco2e|tons?|years?)\b",
flags=re.IGNORECASE,
)
for t in texts:
s = str(t or "")
for m in pattern.finditer(s):
start = max(0, m.start() - 40)
end = min(len(s), m.end() + 40)
snippets.append(_shorten_text(s[start:end], 120))
if len(snippets) >= max_items:
return snippets
return snippets
def _normalize_confidence_distribution(raw):
if not isinstance(raw, dict):
return None
out = {}
for key in ("high", "medium", "low"):
v = raw.get(key, None)
if v is None:
continue
try:
if isinstance(v, str):
s = v.strip()
if s.endswith("%"):
val = float(s[:-1]) / 100.0
else:
val = float(s)
else:
val = float(v)
if val > 1.0 and val <= 100.0:
val = val / 100.0
val = max(0.0, min(1.0, val))
out[key] = val
except Exception:
continue
if not out:
return None
for key in ("high", "medium", "low"):
out.setdefault(key, 0.0)
total = out["high"] + out["medium"] + out["low"]
if total <= 0:
return None
# Normalize to sum=1 for stable UI display.
out["high"] = out["high"] / total
out["medium"] = out["medium"] / total
out["low"] = out["low"] / total
return out
def _extract_confidence_distribution_from_text(text: str):
raw = str(text or "")
cleaned = raw
m = re.search(r"confidence\s*distribution\s*:\s*(\{[^{}]+\})", raw, flags=re.IGNORECASE)
if m:
blob = m.group(1)
parsed = None
for cand in (blob, blob.replace("'", '"')):
try:
parsed = json.loads(cand)
break
except Exception:
continue
conf = _normalize_confidence_distribution(parsed)
if conf:
cleaned = raw.replace(m.group(0), "").strip()
return conf, cleaned
tmp = {}
for k in ("high", "medium", "low"):
mk = re.search(rf"\b{k}\b\s*[:=]\s*([0-9]+(?:\.[0-9]+)?)\s*(%)?", raw, flags=re.IGNORECASE)
if not mk:
continue
num = float(mk.group(1))
if mk.group(2):
num = num / 100.0
tmp[k] = num
conf = _normalize_confidence_distribution(tmp if tmp else None)
return conf, cleaned
def _render_confidence_distribution(raw) -> str:
conf = _normalize_confidence_distribution(raw)
if not conf:
return ""
def _row(label: str, key: str, cls: str) -> str:
pct = conf[key] * 100.0
return (
"<div class='conf-row'>"
f"<div class='conf-label'>{label}</div>"
"<div class='conf-bar'>"
f"<div class='conf-fill {cls}' style='width:{pct:.1f}%'></div>"
"</div>"
f"<div class='conf-value'>{pct:.1f}%</div>"
"</div>"
)
return (
"<div class='confidence-card'>"
"<h4>Confidence Distribution</h4>"
f"{_row('High', 'high', 'conf-high')}"
f"{_row('Medium', 'medium', 'conf-medium')}"
f"{_row('Low', 'low', 'conf-low')}"
"</div>"
)
def _attach_confidence_block(card_html: str, parsed: dict) -> str:
conf_html = _render_confidence_distribution(parsed.get("confidence_distribution"))
if not conf_html:
return card_html
text = str(card_html or "")
if text.rstrip().endswith("</div>"):
return text.rstrip()[:-6] + conf_html + "</div>"
return text + conf_html
def _normalize_skill_for_render(parsed: dict) -> str:
skill = str(parsed.get("skill", "")).strip().lower()
if "trend" in skill and "quant" in skill:
return "Trend & Quant Comparator"
if "attainment" in skill or ("delta" in skill and "benchmark" in skill):
return "Target Attainment & Delta Benchmark"
if "compliance" in skill and "check" in skill:
return "Compliance Checklist"
if "dimension" in skill and "extract" in skill:
return "Dimension Extractor"
if "contradiction" in skill or "consistency" in skill:
return "Contradiction/Consistency Check"
if "consensus" in skill or "portfolio" in skill or "count" in skill:
return "Consensus/Count (Portfolio Statistics)"
if "comparative" in skill or "table" in skill:
return "Comparative Table Builder"
# Fallback by schema keys when model misses `skill`.
if isinstance(parsed.get("required_checks"), list) and isinstance(parsed.get("reports"), list):
return "Compliance Checklist"
if isinstance(parsed.get("counts"), dict) and isinstance(parsed.get("per_report"), list):
return "Consensus/Count (Portfolio Statistics)"
if isinstance(parsed.get("checks"), list) and isinstance(parsed.get("scores"), dict):
return "Contradiction/Consistency Check"
reports = parsed.get("reports")
if isinstance(reports, list) and reports:
first = reports[0] if isinstance(reports[0], dict) else {}
if isinstance(first.get("benchmarks"), list):
return "Target Attainment & Delta Benchmark"
if isinstance(first.get("quant_metrics"), list) and "strength_score" in first:
return "Trend & Quant Comparator"
if isinstance(first.get("bucket_counts"), dict):
return "Dimension Extractor"
if "maturity_level" in first or "comparison_metrics" in parsed:
return "Comparative Table Builder"
if str(parsed.get("report", "")).strip() and any(k in parsed for k in ("maturity_level", "key_evidence", "quant_metrics", "comparison_metrics", "year")):
return "Comparative Table Builder"
return ""
def _render_quant_metrics_matrix(reports: list, report_key: str = "report", metrics_key: str = "quant_metrics") -> str:
if not isinstance(reports, list) or not reports:
return ""
cols = []
metric_map = {}
for r in reports:
if not isinstance(r, dict):
continue
name = str(r.get(report_key, "Unknown")).strip() or "Unknown"
cols.append(name)
qms = r.get(metrics_key, [])
if not isinstance(qms, list):
continue
for m in qms:
if not isinstance(m, dict):
continue
metric = str(m.get("metric", "")).strip()
if not metric:
continue
value = m.get("value", None)
unit = m.get("unit", None)
period = m.get("period", None)
if value is None or str(value).strip() == "":
cell = "N/A"
else:
cell = str(value)
if unit not in (None, ""):
cell += f" {unit}"
if period not in (None, ""):
cell += f" ({period})"
metric_map.setdefault(metric, {})[name] = cell
if not metric_map:
return "<div class='maturity-note'>No explicit quantitative metrics found in the output.</div>"
head_cols = "".join(f"<th>{_escape(c)}</th>" for c in cols)
body_rows = []
for metric in sorted(metric_map.keys()):
row_cells = "".join(f"<td>{_escape(metric_map[metric].get(c, 'N/A'))}</td>" for c in cols)
body_rows.append(f"<tr><th>{_escape(metric)}</th>{row_cells}</tr>")
return (
"<h4>Quantitative Comparison</h4>"
"<div class='maturity-table-wrap'>"
"<table class='maturity-table'>"
f"<thead><tr><th>Metric</th>{head_cols}</tr></thead>"
f"<tbody>{''.join(body_rows)}</tbody>"
"</table></div>"
)
def _render_maturity_comparison_html(parsed: dict) -> str:
mc = parsed.get("maturity_comparison")
if not isinstance(mc, dict) or not mc:
return ""
companies = list(mc.keys())
rows_html = []
level_cells = []
evidence_cells = []
quant_cells = []
for k in companies:
item = mc.get(k, {}) if isinstance(mc.get(k), dict) else {}
level = str(item.get("maturity_level", "unknown")).strip()
low = level.lower()
suffix = " x" if low == "insufficient" else ""
badge_cls = f"level-{low}" if low in {"high", "moderate", "low", "insufficient"} else "level-unknown"
level_cells.append(f'<td><span class="maturity-badge {badge_cls}">{_escape(level + suffix)}</span></td>')
evidence = item.get("evidence", [])
if isinstance(evidence, list) and evidence:
bullets = "".join(f"<li>{_escape(_shorten_text(e, 180))}</li>" for e in evidence[:2])
evidence_cells.append(f"<td><ul class='maturity-list'>{bullets}</ul></td>")
else:
evidence_cells.append("<td><span class='muted'>N/A x</span></td>")
rationale = item.get("rationale", "")
snippets = _extract_number_snippets((evidence if isinstance(evidence, list) else []) + [rationale], max_items=3)
if snippets:
quant_cells.append("<td><ul class='maturity-list'>" + "".join(f"<li>{_escape(s)}</li>" for s in snippets) + "</ul></td>")
else:
quant_cells.append("<td><span class='muted'>No numeric metric found</span></td>")
rows_html.append("<tr><th>Maturity Level</th>" + "".join(level_cells) + "</tr>")
rows_html.append("<tr><th>Key Evidence</th>" + "".join(evidence_cells) + "</tr>")
rows_html.append("<tr><th>Quant Metrics</th>" + "".join(quant_cells) + "</tr>")
header_cells = "".join(f"<th>{_escape(_pretty_company_name(c))}</th>" for c in companies)
conclusion = parsed.get("conclusion", "")
conclusion_html = ""
if isinstance(conclusion, str) and conclusion.strip():
conclusion_html = (
"<div class='maturity-conclusion'>"
"<h4>Conclusion</h4>"
f"<p>{_escape(conclusion.strip())}</p>"
"</div>"
)
return (
"<div class='maturity-card'>"
"<h3>Company Maturity Comparison</h3>"
"<div class='maturity-table-wrap'>"
"<table class='maturity-table'>"
"<thead><tr><th>Attribute</th>"
f"{header_cells}"
"</tr></thead>"
"<tbody>"
f"{''.join(rows_html)}"
"</tbody></table></div>"
f"{conclusion_html}"
"</div>"
)
def _render_comparative_table_builder_html(parsed: dict) -> str:
reports = parsed.get("reports", [])
if not isinstance(reports, list) or not reports:
return ""
header_cells = "".join(f"<th>{_escape(r.get('report', 'Unknown'))}</th>" for r in reports if isinstance(r, dict))
if not header_cells:
return ""
maturity_cells = []
evidence_cells = []
for r in reports:
if not isinstance(r, dict):
continue
level = str(r.get("maturity_level", "unknown")).strip()
low = level.lower()
suffix = " x" if low == "insufficient" else ""
cls = f"level-{low}" if low in {"high", "moderate", "low", "insufficient"} else "level-unknown"
maturity_cells.append(f'<td><span class="maturity-badge {cls}">{_escape(level + suffix)}</span></td>')
evidence = r.get("key_evidence", [])
if isinstance(evidence, list) and evidence:
bullets = "".join(f"<li>{_escape(_shorten_text(e, 180))}</li>" for e in evidence[:2])
evidence_cells.append(f"<td><ul class='maturity-list'>{bullets}</ul></td>")
else:
evidence_cells.append("<td><span class='muted'>N/A</span></td>")
compare_metrics = parsed.get("comparison_metrics", [])
compare_html = ""
if isinstance(compare_metrics, list) and compare_metrics:
chips = "".join(f"<span class='metric-chip'>{_escape(m)}</span>" for m in compare_metrics[:8])
compare_html = f"<div class='metric-chip-wrap'><strong>Compared metrics:</strong> {chips}</div>"
matrix_html = _render_quant_metrics_matrix(reports, report_key="report", metrics_key="quant_metrics")
conclusion = str(parsed.get("conclusion", "")).strip()
conclusion_html = f"<div class='maturity-conclusion'><h4>Conclusion</h4><p>{_escape(conclusion)}</p></div>" if conclusion else ""
return (
"<div class='maturity-card'>"
"<h3>Comparative Table Builder</h3>"
"<div class='maturity-table-wrap'><table class='maturity-table'>"
f"<thead><tr><th>Attribute</th>{header_cells}</tr></thead>"
"<tbody>"
f"<tr><th>Maturity Level</th>{''.join(maturity_cells)}</tr>"
f"<tr><th>Key Evidence</th>{''.join(evidence_cells)}</tr>"
"</tbody></table></div>"
f"{compare_html}"
f"{matrix_html}"
f"{conclusion_html}"
"</div>"
)
def _format_num(v) -> str:
if v in (None, ""):
return "N/A"
try:
f = float(v)
if f.is_integer():
return str(int(f))
return f"{f:.2f}"
except Exception:
return str(v)
def _format_pct(v) -> str:
if v in (None, ""):
return "N/A"
try:
return f"{float(v):.2f}%"
except Exception:
s = str(v)
return s if s.endswith("%") else f"{s}%"
def _trend_badge(direction: str) -> str:
d = str(direction or "unknown").strip().lower()
icon = {"up": "up", "down": "down", "flat": "flat"}.get(d, "unknown")
return f"<span class='metric-chip trend-{icon}'>{_escape(d)}</span>"
def _render_trend_metric_cell(metric_obj: dict) -> str:
if not isinstance(metric_obj, dict):
return "<span class='muted'>N/A</span>"
value = _format_num(metric_obj.get("value"))
unit = metric_obj.get("unit")
period = metric_obj.get("period")
value_line = value
if unit not in (None, "") and value != "N/A":
value_line = f"{value} {unit}"
if period not in (None, "") and value_line != "N/A":
value_line = f"{value_line} ({period})"
intensity = _format_num(metric_obj.get("intensity"))
attainment = _format_pct(metric_obj.get("attainment_rate"))
change = _format_num(metric_obj.get("change_magnitude"))
note = _shorten_text(metric_obj.get("note", ""), 90)
note_html = "" if not note else f"<div class='muted'>{_escape(note)}</div>"
return (
f"<div><strong>Value:</strong> {_escape(value_line)}</div>"
f"<div><strong>Intensity:</strong> {_escape(intensity)}</div>"
f"<div><strong>Attainment:</strong> {_escape(attainment)}</div>"
f"<div><strong>Change:</strong> {_escape(change)}</div>"
f"<div><strong>Trend:</strong> {_trend_badge(metric_obj.get('trend_direction', 'unknown'))}</div>"
f"{note_html}"
)
def _render_trend_quant_comparator_html(parsed: dict) -> str:
reports = parsed.get("reports", [])
if not isinstance(reports, list) or not reports:
return ""
report_names = [str(r.get("report", "Unknown")) for r in reports if isinstance(r, dict)]
if not report_names:
return ""
header = "".join(f"<th>{_escape(n)}</th>" for n in report_names)
strength_cells = []
evidence_cells = []
metric_map = {}
for r in reports:
if not isinstance(r, dict):
continue
rn = str(r.get("report", "Unknown"))
strength_cells.append(f"<td>{_escape(_format_num(r.get('strength_score')))}</td>")
ev = r.get("key_evidence", [])
if isinstance(ev, list) and ev:
bullets = "".join(f"<li>{_escape(_shorten_text(x, 130))}</li>" for x in ev[:3])
evidence_cells.append(f"<td><ul class='maturity-list'>{bullets}</ul></td>")
else:
evidence_cells.append("<td><span class='muted'>N/A</span></td>")
for m in r.get("quant_metrics", []) if isinstance(r.get("quant_metrics"), list) else []:
if not isinstance(m, dict):
continue
metric = str(m.get("metric", "")).strip()
if not metric:
continue
metric_map.setdefault(metric, {})[rn] = m
metric_rows = []
for metric in sorted(metric_map.keys()):
cells = []
for rn in report_names:
cells.append(f"<td>{_render_trend_metric_cell(metric_map[metric].get(rn, {}))}</td>")
metric_rows.append(f"<tr><th>{_escape(metric)}</th>{''.join(cells)}</tr>")
if not metric_rows:
metric_rows = [f"<tr><th>Quant Metrics</th><td colspan='{len(report_names)}'><span class='muted'>No quantitative metrics returned.</span></td></tr>"]
highlights = parsed.get("metric_highlights", [])
highlight_html = ""
if isinstance(highlights, list) and highlights:
chips = "".join(f"<span class='metric-chip'>{_escape(_shorten_text(x, 60))}</span>" for x in highlights[:10])
highlight_html = f"<div class='metric-chip-wrap'><strong>Metric Highlights:</strong> {chips}</div>"
conclusion = str(parsed.get("conclusion", "")).strip()
conclusion_html = f"<div class='maturity-conclusion'><h4>Conclusion</h4><p>{_escape(conclusion)}</p></div>" if conclusion else ""
return (
"<div class='maturity-card'>"
"<h3>Trend & Quant Comparator</h3>"
"<div class='maturity-table-wrap'><table class='maturity-table'>"
f"<thead><tr><th>Attribute</th>{header}</tr></thead>"
"<tbody>"
f"<tr><th>Strength Score</th>{''.join(strength_cells)}</tr>"
f"<tr><th>Key Evidence</th>{''.join(evidence_cells)}</tr>"
f"{''.join(metric_rows)}"
"</tbody></table></div>"
f"{highlight_html}"
f"{conclusion_html}"
"</div>"
)
def _render_benchmark_metric_cell(metric_obj: dict) -> str:
if not isinstance(metric_obj, dict):
return "<span class='muted'>N/A</span>"
unit = metric_obj.get("unit")
base_v = _format_num(metric_obj.get("baseline_value"))
cur_v = _format_num(metric_obj.get("current_value"))
tgt_v = _format_num(metric_obj.get("target_value"))
base_p = metric_obj.get("baseline_period")
cur_p = metric_obj.get("current_period")
tgt_p = metric_obj.get("target_period")
def _with_unit(v):
if v == "N/A":
return v
return f"{v} {unit}" if unit not in (None, "") else v
baseline = _with_unit(base_v)
current = _with_unit(cur_v)
target = _with_unit(tgt_v)
baseline = f"{baseline} ({base_p})" if base_p not in (None, "") and baseline != "N/A" else baseline
current = f"{current} ({cur_p})" if cur_p not in (None, "") and current != "N/A" else current
target = f"{target} ({tgt_p})" if tgt_p not in (None, "") and target != "N/A" else target
attainment = _format_pct(metric_obj.get("attainment_rate"))
delta_abs = _format_num(metric_obj.get("delta_abs"))
delta_pct = _format_pct(metric_obj.get("delta_percent"))
intensity = _format_num(metric_obj.get("intensity"))
note = _shorten_text(metric_obj.get("note", ""), 90)
note_html = "" if not note else f"<div class='muted'>{_escape(note)}</div>"
return (
f"<div><strong>Baseline:</strong> {_escape(baseline)}</div>"
f"<div><strong>Current:</strong> {_escape(current)}</div>"
f"<div><strong>Target:</strong> {_escape(target)}</div>"
f"<div><strong>Attainment:</strong> {_escape(attainment)}</div>"
f"<div><strong>Delta (abs/%):</strong> {_escape(delta_abs)} / {_escape(delta_pct)}</div>"
f"<div><strong>Intensity:</strong> {_escape(intensity)}</div>"
f"<div><strong>Trend:</strong> {_trend_badge(metric_obj.get('trend_direction', 'unknown'))}</div>"
f"{note_html}"
)
def _render_target_attainment_delta_html(parsed: dict) -> str:
reports = parsed.get("reports", [])
if not isinstance(reports, list) or not reports:
return ""
report_names = [str(r.get("report", "Unknown")) for r in reports if isinstance(r, dict)]
if not report_names:
return ""
header = "".join(f"<th>{_escape(n)}</th>" for n in report_names)
strength_cells = []
evidence_cells = []
metric_map = {}
for r in reports:
if not isinstance(r, dict):
continue
rn = str(r.get("report", "Unknown"))
strength = str(r.get("overall_strength", "insufficient")).strip()
low = strength.lower()
suffix = " x" if low == "insufficient" else ""
cls = f"level-{low}" if low in {"high", "moderate", "low", "insufficient"} else "level-unknown"
strength_cells.append(f'<td><span class="maturity-badge {cls}">{_escape(strength + suffix)}</span></td>')
ev = r.get("key_evidence", [])
if isinstance(ev, list) and ev:
bullets = "".join(f"<li>{_escape(_shorten_text(x, 130))}</li>" for x in ev[:3])
evidence_cells.append(f"<td><ul class='maturity-list'>{bullets}</ul></td>")
else:
evidence_cells.append("<td><span class='muted'>N/A</span></td>")
for m in r.get("benchmarks", []) if isinstance(r.get("benchmarks"), list) else []:
if not isinstance(m, dict):
continue
metric = str(m.get("metric", "")).strip()
if not metric:
continue
metric_map.setdefault(metric, {})[rn] = m
metric_rows = []
for metric in sorted(metric_map.keys()):
cells = []
for rn in report_names:
cells.append(f"<td>{_render_benchmark_metric_cell(metric_map[metric].get(rn, {}))}</td>")
metric_rows.append(f"<tr><th>{_escape(metric)}</th>{''.join(cells)}</tr>")
if not metric_rows:
metric_rows = [f"<tr><th>Benchmarks</th><td colspan='{len(report_names)}'><span class='muted'>No benchmark metrics returned.</span></td></tr>"]
leaderboard = parsed.get("leaderboard", [])
leaderboard_html = ""
if isinstance(leaderboard, list) and leaderboard:
rows = []
for item in leaderboard[:6]:
if not isinstance(item, dict):
continue
rows.append(
"<tr>"
f"<td>{_escape(item.get('report', ''))}</td>"
f"<td>{_escape(_format_num(item.get('score')))}</td>"
f"<td>{_escape(_shorten_text(item.get('reason', ''), 120))}</td>"
"</tr>"
)
if rows:
leaderboard_html = (
"<div class='maturity-table-wrap' style='margin-top:10px;'>"
"<table class='maturity-table'>"
"<thead><tr><th>Leaderboard Report</th><th>Score</th><th>Reason</th></tr></thead>"
f"<tbody>{''.join(rows)}</tbody>"
"</table></div>"
)
conclusion = str(parsed.get("conclusion", "")).strip()
conclusion_html = f"<div class='maturity-conclusion'><h4>Conclusion</h4><p>{_escape(conclusion)}</p></div>" if conclusion else ""
return (
"<div class='maturity-card'>"
"<h3>Target Attainment & Delta Benchmark</h3>"
"<div class='maturity-table-wrap'><table class='maturity-table'>"
f"<thead><tr><th>Attribute</th>{header}</tr></thead>"
"<tbody>"
f"<tr><th>Overall Strength</th>{''.join(strength_cells)}</tr>"
f"<tr><th>Key Evidence</th>{''.join(evidence_cells)}</tr>"
f"{''.join(metric_rows)}"
"</tbody></table></div>"
f"{leaderboard_html}"
f"{conclusion_html}"
"</div>"
)
def _render_compliance_checklist_html(parsed: dict) -> str:
reports = parsed.get("reports", [])
if not isinstance(reports, list) or not reports:
return ""
required_items = parsed.get("required_checks", [])
if not isinstance(required_items, list):
required_items = []
report_names = [str(r.get("report", "Unknown")) for r in reports if isinstance(r, dict)]
if not report_names:
return ""
header = "".join(f"<th>{_escape(n)}</th>" for n in report_names)
summary_cells = []
evidence_cells = []
for r in reports:
if not isinstance(r, dict):
continue
s = r.get("summary", {}) if isinstance(r.get("summary"), dict) else {}
summary_cells.append(
"<td>"
f"pass={_escape(s.get('pass', 0))}, partial={_escape(s.get('partial', 0))}, fail={_escape(s.get('fail', 0))}"
f"<br>completion={_escape(s.get('completion_rate', 'N/A'))}"
"</td>"
)
ev = r.get("key_evidence", [])
if isinstance(ev, list) and ev:
bullets = "".join(f"<li>{_escape(_shorten_text(x, 130))}</li>" for x in ev[:3])
evidence_cells.append(f"<td><ul class='maturity-list'>{bullets}</ul></td>")
else:
evidence_cells.append("<td><span class='muted'>N/A</span></td>")
item_rows = []
for item in required_items:
cells = []
for r in reports:
status = "insufficient"
quant = "N/A"
note = ""
checks = r.get("checks", []) if isinstance(r, dict) else []
if isinstance(checks, list):
for c in checks:
if not isinstance(c, dict):
continue
if str(c.get("item", "")).strip().lower() == str(item).strip().lower():
status = str(c.get("status", "insufficient"))
qv = c.get("quant_value", None)
qu = c.get("quant_unit", None)
quant = "N/A" if qv in (None, "") else f"{qv}{'' if qu in (None, '') else ' ' + str(qu)}"
note = _shorten_text(c.get("note", ""), 90)
break
mark = " x" if status.lower() in {"fail", "insufficient"} else ""
cells.append(f"<td><strong>{_escape(status + mark)}</strong><br>{_escape(quant)}<br><span class='muted'>{_escape(note)}</span></td>")
item_rows.append(f"<tr><th>{_escape(item)}</th>{''.join(cells)}</tr>")
matrix_html = _render_quant_metrics_matrix(reports, report_key="report", metrics_key="quant_metrics")
conclusion = str(parsed.get("conclusion", "")).strip()
conclusion_html = f"<div class='maturity-conclusion'><h4>Conclusion</h4><p>{_escape(conclusion)}</p></div>" if conclusion else ""
return (
"<div class='maturity-card'>"
"<h3>Compliance Checklist</h3>"
"<div class='maturity-table-wrap'><table class='maturity-table'>"
f"<thead><tr><th>Checklist Item</th>{header}</tr></thead>"
"<tbody>"
f"<tr><th>Summary</th>{''.join(summary_cells)}</tr>"
f"<tr><th>Key Evidence</th>{''.join(evidence_cells)}</tr>"
f"{''.join(item_rows)}"
"</tbody></table></div>"
f"{matrix_html}"
f"{conclusion_html}"
"</div>"
)
def _render_dimension_extractor_html(parsed: dict) -> str:
reports = parsed.get("reports", [])
if not isinstance(reports, list) or not reports:
return ""
report_names = [str(r.get("report", "Unknown")) for r in reports if isinstance(r, dict)]
if not report_names:
return ""
header = "".join(f"<th>{_escape(n)}</th>" for n in report_names)
rows = []
for b in ["Process", "Input", "Output", "Outcome", "Governance", "Risk"]:
cells = []
for r in reports:
bc = r.get("bucket_counts", {}) if isinstance(r, dict) and isinstance(r.get("bucket_counts"), dict) else {}
cells.append(f"<td>{_escape(bc.get(b, 0))}</td>")
rows.append(f"<tr><th>{_escape(b)}</th>{''.join(cells)}</tr>")
coverage_cells = []
evidence_cells = []
for r in reports:
level = str(r.get("coverage_level", "unknown")) if isinstance(r, dict) else "unknown"
low = level.lower()
suffix = " x" if low == "insufficient" else ""
cls = f"level-{low}" if low in {"high", "moderate", "low", "insufficient"} else "level-unknown"
coverage_cells.append(f'<td><span class="maturity-badge {cls}">{_escape(level + suffix)}</span></td>')
ev = r.get("key_evidence", []) if isinstance(r, dict) else []
if isinstance(ev, list) and ev:
bullets = "".join(f"<li>{_escape(_shorten_text(x, 130))}</li>" for x in ev[:3])
evidence_cells.append(f"<td><ul class='maturity-list'>{bullets}</ul></td>")
else:
evidence_cells.append("<td><span class='muted'>N/A</span></td>")
matrix_html = _render_quant_metrics_matrix(reports, report_key="report", metrics_key="quant_metrics")
conclusion = str(parsed.get("conclusion", "")).strip()
conclusion_html = f"<div class='maturity-conclusion'><h4>Conclusion</h4><p>{_escape(conclusion)}</p></div>" if conclusion else ""
return (
"<div class='maturity-card'>"
"<h3>Dimension Extractor</h3>"
"<div class='maturity-table-wrap'><table class='maturity-table'>"
f"<thead><tr><th>Bucket</th>{header}</tr></thead>"
"<tbody>"
f"{''.join(rows)}"
f"<tr><th>Coverage Level</th>{''.join(coverage_cells)}</tr>"
f"<tr><th>Key Evidence</th>{''.join(evidence_cells)}</tr>"
"</tbody></table></div>"
f"{matrix_html}"
f"{conclusion_html}"
"</div>"
)
def _render_consistency_check_html(parsed: dict) -> str:
checks = parsed.get("checks", [])
scores = parsed.get("scores", {}) if isinstance(parsed.get("scores"), dict) else {}
if not isinstance(checks, list):
checks = []
check_rows = []
for c in checks:
if not isinstance(c, dict):
continue
result = str(c.get("result", "insufficient")).strip()
mark = " x" if result.lower() in {"inconsistent", "insufficient"} else ""
check_rows.append(
"<tr>"
f"<td>{_escape(c.get('rule', ''))}</td>"
f"<td>{_escape(result + mark)}</td>"
f"<td>{_escape(_shorten_text(c.get('note', ''), 180))}</td>"
"</tr>"
)
check_rows_html = "".join(check_rows) if check_rows else "<tr><td colspan='3'>No checks returned.</td></tr>"
key_evidence = parsed.get("key_evidence", [])
key_evidence_html = ""
if isinstance(key_evidence, list) and key_evidence:
bullets = "".join(f"<li>{_escape(_shorten_text(x, 180))}</li>" for x in key_evidence[:6])
key_evidence_html = (
"<div class='maturity-table-wrap' style='margin-top:10px;'><table class='maturity-table'>"
"<thead><tr><th>Key Evidence</th></tr></thead>"
f"<tbody><tr><td><ul class='maturity-list'>{bullets}</ul></td></tr></tbody>"
"</table></div>"
)
conclusion = str(parsed.get("conclusion", "")).strip()
conclusion_html = f"<div class='maturity-conclusion'><h4>Conclusion</h4><p>{_escape(conclusion)}</p></div>" if conclusion else ""
return (
"<div class='maturity-card'>"
"<h3>Contradiction / Consistency Check</h3>"
"<div class='maturity-table-wrap'><table class='maturity-table'>"
"<thead><tr><th>Rule</th><th>Result</th><th>Note</th></tr></thead>"
f"<tbody>{check_rows_html}</tbody>"
"</table></div>"
f"{key_evidence_html}"
"<div class='maturity-table-wrap' style='margin-top:10px;'><table class='maturity-table'>"
"<thead><tr><th>Score Item</th><th>Value</th></tr></thead>"
"<tbody>"
f"<tr><th>consistent</th><td>{_escape(scores.get('consistent', 0))}</td></tr>"
f"<tr><th>inconsistent</th><td>{_escape(scores.get('inconsistent', 0))}</td></tr>"
f"<tr><th>insufficient</th><td>{_escape(scores.get('insufficient', 0))}</td></tr>"
f"<tr><th>consistency_rate</th><td>{_escape(scores.get('consistency_rate', 'N/A'))}</td></tr>"
"</tbody></table></div>"
f"{conclusion_html}"
"</div>"
)
def _render_consensus_count_html(parsed: dict) -> str:
counts = parsed.get("counts", {}) if isinstance(parsed.get("counts"), dict) else {}
percentages = parsed.get("percentages", {}) if isinstance(parsed.get("percentages"), dict) else {}
per_report = parsed.get("per_report", [])
if not isinstance(per_report, list):
per_report = []
ev_map = {}
key_evidence_by_report = parsed.get("key_evidence_by_report", [])
if isinstance(key_evidence_by_report, list):
for row in key_evidence_by_report:
if not isinstance(row, dict):
continue
report = str(row.get("report", "")).strip()
ev = row.get("key_evidence", [])
if report and isinstance(ev, list):
ev_map[report] = ev
report_rows = []
for r in per_report:
if not isinstance(r, dict):
continue
label = str(r.get("label", "insufficient"))
mark = " x" if label.lower() in {"missing", "insufficient"} else ""
report = str(r.get("report", ""))
ev = ev_map.get(report, r.get("key_evidence", []))
ev_html = "<span class='muted'>N/A</span>"
if isinstance(ev, list) and ev:
ev_html = "<ul class='maturity-list'>" + "".join(f"<li>{_escape(_shorten_text(x, 120))}</li>" for x in ev[:2]) + "</ul>"
report_rows.append("<tr><td>{}</td><td>{}</td><td>{}</td></tr>".format(_escape(report), _escape(label + mark), ev_html))
report_rows_html = "".join(report_rows) if report_rows else "<tr><td colspan='3'>No report labels returned.</td></tr>"
matrix_html = _render_quant_metrics_matrix(per_report, report_key="report", metrics_key="quant_metrics")
consensus_items = parsed.get("consensus_items", [])
outliers = parsed.get("outliers", [])
consensus_html = "".join(f"<li>{_escape(_shorten_text(x, 140))}</li>" for x in consensus_items[:6]) if isinstance(consensus_items, list) else ""
outliers_html = "".join(f"<li>{_escape(_shorten_text(x, 140))}</li>" for x in outliers[:6]) if isinstance(outliers, list) else ""
conclusion = str(parsed.get("conclusion", "")).strip()
conclusion_html = f"<div class='maturity-conclusion'><h4>Conclusion</h4><p>{_escape(conclusion)}</p></div>" if conclusion else ""
return (
"<div class='maturity-card'>"
"<h3>Consensus / Count (Portfolio Statistics)</h3>"
"<div class='maturity-table-wrap'><table class='maturity-table'>"
"<thead><tr><th>Count Item</th><th>Value</th></tr></thead>"
"<tbody>"
f"<tr><th>explicit</th><td>{_escape(counts.get('explicit', 0))} ({_escape(percentages.get('explicit', 'N/A'))}%)</td></tr>"
f"<tr><th>partial</th><td>{_escape(counts.get('partial', 0))} ({_escape(percentages.get('partial', 'N/A'))}%)</td></tr>"
f"<tr><th>missing</th><td>{_escape(counts.get('missing', 0))} ({_escape(percentages.get('missing', 'N/A'))}%)</td></tr>"
f"<tr><th>total</th><td>{_escape(counts.get('total', len(per_report)))}</td></tr>"
"</tbody></table></div>"
"<div class='maturity-table-wrap' style='margin-top:10px;'><table class='maturity-table'>"
"<thead><tr><th>Report</th><th>Label</th><th>Key Evidence</th></tr></thead>"
f"<tbody>{report_rows_html}</tbody>"
"</table></div>"
f"{matrix_html}"
"<div class='maturity-split'>"
f"<div><h4>Consensus Items</h4><ul class='maturity-list'>{consensus_html or '<li>None</li>'}</ul></div>"
f"<div><h4>Outliers</h4><ul class='maturity-list'>{outliers_html or '<li>None</li>'}</ul></div>"
"</div>"
f"{conclusion_html}"
"</div>"
)
def _coerce_payload_for_ui(payload):
if isinstance(payload, list):
if payload and all(isinstance(x, dict) and str(x.get("report", "")).strip() for x in payload):
return {
"skill": "Comparative Table Builder",
"reports": payload,
}
return payload
if not isinstance(payload, dict):
return payload
if isinstance(payload.get("reports"), list) or isinstance(payload.get("maturity_comparison"), dict):
return payload
is_single_report_record = (
str(payload.get("report", "")).strip() != ""
and any(k in payload for k in ("maturity_level", "key_evidence", "quant_metrics", "comparison_metrics", "year"))
and not any(k in payload for k in ("answer", "explanation", "evidence_ids", "rows", "ranking"))
)
if not is_single_report_record:
return payload
report_item = {
"report": payload.get("report", "Unknown"),
"year": payload.get("year", None),
"maturity_level": payload.get("maturity_level", "unknown"),
"key_evidence": payload.get("key_evidence", []) if isinstance(payload.get("key_evidence"), list) else [],
"quant_metrics": payload.get("quant_metrics", []) if isinstance(payload.get("quant_metrics"), list) else [],
}
normalized = {
"skill": str(payload.get("skill", "")).strip() or "Comparative Table Builder",
"reports": [report_item],
}
if "dimension" in payload:
normalized["dimension"] = payload.get("dimension")
if isinstance(payload.get("comparison_metrics"), list):
normalized["comparison_metrics"] = payload.get("comparison_metrics")
if "conclusion" in payload:
normalized["conclusion"] = payload.get("conclusion")
if "confidence_distribution" in payload:
normalized["confidence_distribution"] = payload.get("confidence_distribution")
return normalized
def _render_skill_specific_html(parsed: dict) -> str:
legacy = _render_maturity_comparison_html(parsed)
if legacy:
return _attach_confidence_block(legacy, parsed)
skill = _normalize_skill_for_render(parsed)
html = ""
if skill == "Trend & Quant Comparator":
html = _render_trend_quant_comparator_html(parsed)
if skill == "Target Attainment & Delta Benchmark":
html = _render_target_attainment_delta_html(parsed)
if skill == "Comparative Table Builder":
html = _render_comparative_table_builder_html(parsed)
if skill == "Compliance Checklist":
html = _render_compliance_checklist_html(parsed)
if skill == "Dimension Extractor":
html = _render_dimension_extractor_html(parsed)
if skill == "Contradiction/Consistency Check":
html = _render_consistency_check_html(parsed)
if skill == "Consensus/Count (Portfolio Statistics)":
html = _render_consensus_count_html(parsed)
if not html:
return ""
return _attach_confidence_block(html, parsed)
def _extract_json_payload(text: str):
"""Extract a JSON value from mixed model output text."""
if not text:
return None
decoder = json.JSONDecoder()
# 1) Whole string is JSON.
try:
return json.loads(text)
except Exception:
pass
# 2) JSON fenced blocks.
for block in re.findall(r"```(?:json)?\s*([\s\S]*?)```", text, flags=re.IGNORECASE):
try:
return json.loads(block.strip())
except Exception:
continue
# 3) Marker-based extraction.
marker = "Final Answer in JSON:"
if marker in text:
tail = text.split(marker, 1)[1].strip()
if tail:
try:
obj, _ = decoder.raw_decode(tail)
return obj
except Exception:
pass
# 4) Scan every '{' and try raw_decode on that suffix.
candidates = []
for i, ch in enumerate(text):
if ch != "{":
continue
try:
obj, end = decoder.raw_decode(text[i:])
consumed = text[i:i + end]
candidates.append((obj, len(consumed)))
except Exception:
continue
if not candidates:
return None
# Prefer dicts with known answer schemas, else the largest parsed candidate.
def _score(item):
obj, consumed_len = item
schema_bonus = 0
if isinstance(obj, dict):
if any(k in obj for k in ("dimension", "rows", "ranking")):
schema_bonus += 10
if any(k in obj for k in ("answer", "explanation", "evidence_ids")):
schema_bonus += 10
if "maturity_comparison" in obj:
schema_bonus += 15
if any(k in obj for k in ("reports", "counts", "checks", "bucket_counts", "per_report")):
schema_bonus += 12
if "skill" in obj:
schema_bonus += 8
if "confidence_distribution" in obj:
schema_bonus += 6
return (schema_bonus, consumed_len)
candidates.sort(key=_score, reverse=True)
return candidates[0][0]
def _format_answer(answer: str) -> str:
if not answer:
return ""
parsed = _coerce_payload_for_ui(_extract_json_payload(answer))
if isinstance(parsed, dict):
low_keys = {str(k).strip().lower() for k in parsed.keys()}
if low_keys and low_keys.issubset({"high", "medium", "low"}):
conf = _normalize_confidence_distribution(parsed)
if conf:
cleaned = re.sub(
r"confidence\s*distribution\s*:\s*\{[^{}]+\}",
"",
str(answer),
flags=re.IGNORECASE,
).strip()
body = _escape(cleaned).replace("\n", "<br>")
return f"<div>{body}</div>{_render_confidence_distribution(conf)}"
if parsed is None:
conf, cleaned = _extract_confidence_distribution_from_text(answer)
conf_html = _render_confidence_distribution(conf)
if conf_html:
body = _escape(cleaned).replace("\n", "<br>")
return f"<div>{body}</div>{conf_html}"
return answer
if not isinstance(parsed, dict):
return f"```json\n{json.dumps(parsed, ensure_ascii=False, indent=2)}\n```"
skill_html = _render_skill_specific_html(parsed)
if skill_html:
return skill_html
parts = []
dimension = parsed.get("dimension")
if dimension:
parts.append(f"**Dimension:** {_md_cell(dimension)}")
rows = parsed.get("rows")
if isinstance(rows, list) and rows:
table_rows = []
for item in rows:
if not isinstance(item, dict):
continue
table_rows.append({
"Report": item.get("report", ""),
"Year": item.get("year", ""),
"Status": item.get("disclosure_status", ""),
"Key Points": len(item.get("key_points") or []),
"Evidence": len(item.get("evidence_chunks") or []),
})
if table_rows:
parts.append("### Comparison")
parts.append(_md_table(table_rows, ["Report", "Year", "Status", "Key Points", "Evidence"]))
ranking = parsed.get("ranking")
if isinstance(ranking, list) and ranking:
ranking_rows = []
for item in ranking:
if not isinstance(item, dict):
continue
ranking_rows.append({
"Rank": item.get("rank", ""),
"Report": item.get("report", ""),
"Rationale": _truncate(item.get("rationale", "")),
})
if ranking_rows:
parts.append("### Ranking")
parts.append(_md_table(ranking_rows, ["Rank", "Report", "Rationale"]))
conclusion = parsed.get("conclusion")
if isinstance(conclusion, str) and conclusion.strip():
parts.append("### Conclusion")
parts.append(_md_cell(conclusion))
# Generic JSON-answer schema fallback.
if "answer" in parsed or "explanation" in parsed or "evidence_ids" in parsed:
ans = parsed.get("answer", "")
exp = parsed.get("explanation", "")
if ans:
parts.append("### Answer")
parts.append(_md_cell(ans))
if exp:
parts.append("### Explanation")
parts.append(_md_cell(exp))
ev_ids = parsed.get("evidence_ids")
if isinstance(ev_ids, list) and ev_ids:
parts.append(f"### Retrieved Sources Count\n{len(ev_ids)}")
# Show remaining scalar fields in a compact table.
skip_keys = {"answer", "explanation", "evidence_ids", "dimension", "rows", "ranking"}
extra_rows = []
for k, v in parsed.items():
if k in skip_keys:
continue
if isinstance(v, (str, int, float, bool)) or v is None:
extra_rows.append({"Field": k, "Value": _md_cell(v)})
if extra_rows:
parts.append("### Extra Fields")
parts.append(_md_table(extra_rows, ["Field", "Value"]))
conf_html = _render_confidence_distribution(parsed.get("confidence_distribution"))
if conf_html:
parts.append(conf_html)
if parts:
return "\n\n".join(parts)
return f"```json\n{json.dumps(parsed, ensure_ascii=False, indent=2)}\n```"
# ======================== Handlers ========================
def on_doc_mode_change(doc_mode):
if doc_mode == "Single-document":
return (
gr.update(placeholder=PLACEHOLDER_SINGLE, value=""),
gr.update(
value=(
'<p class="hint-text">'
'\U0001f4a1 Tip: We recommend prefixing your question with the report name, '
'e.g. <i>"For [Report Name], does the company ...?"</i>'
'</p>'
),
visible=True,
),
)
return (
gr.update(placeholder=PLACEHOLDER_MULTI, value=""),
gr.update(
value=(
'<p class="hint-text">'
'\U0001f4a1 Tip: We recommend prefixing your question with the report name, '
'e.g. <i>"For [Report 1 Name] and [Report 2 Name], does ...?"</i>'
'</p>'
),
visible=True,
),
)
def on_model_selection_change(gen_model, embed_model):
use_api_gen = "(API)" in str(gen_model)
use_gemini_gen = use_api_gen and _is_gemini_generation_model(gen_model)
needs_openai_key = (str(embed_model) in OPENAI_EMBED_MODELS_SET) or (use_api_gen and not use_gemini_gen)
needs_gemini_key = use_gemini_gen
return (
gr.update(visible=needs_openai_key),
gr.update(visible=needs_gemini_key),
)
def on_report_select(report_name):
if not report_name:
return (
"<p>No report selected.</p>",
1,
1,
"Page: 1 / 1",
gr.update(interactive=False),
gr.update(interactive=False),
)
total = _get_pdf_total_pages(report_name)
return (
_pdf_iframe(report_name, page=1),
1,
total,
f"Page: 1 / {total}",
gr.update(interactive=False),
gr.update(interactive=total > 1),
)
def on_prev_page(report_name, current_page, total_pages):
if not report_name:
return (
"<p>No report selected.</p>",
1,
1,
"Page: 1 / 1",
gr.update(interactive=False),
gr.update(interactive=False),
)
total = max(1, int(total_pages or 1))
page = max(1, int(current_page or 1) - 1)
return (
_pdf_iframe(report_name, page=page),
page,
total,
f"Page: {page} / {total}",
gr.update(interactive=page > 1),
gr.update(interactive=page < total),
)
def on_next_page(report_name, current_page, total_pages):
if not report_name:
return (
"<p>No report selected.</p>",
1,
1,
"Page: 1 / 1",
gr.update(interactive=False),
gr.update(interactive=False),
)
total = max(1, int(total_pages or 1))
page = min(total, max(1, int(current_page or 1) + 1))
return (
_pdf_iframe(report_name, page=page),
page,
total,
f"Page: {page} / {total}",
gr.update(interactive=page > 1),
gr.update(interactive=page < total),
)
def on_run_start():
return "## Waiting......", "", _render_waiting("Waiting......")
def _has_openai_api_key(local_api_key: str) -> bool:
if str(local_api_key or "").strip():
return True
if os.getenv("OPENAI_API_KEY", "").strip():
return True
if os.getenv("OPENAI_API_KEY_88996", "").strip():
return True
return False
def _has_gemini_api_key(local_api_key: str) -> bool:
if str(local_api_key or "").strip():
return True
if os.getenv("GEMINI_API_KEY", "").strip():
return True
return False
def do_query(question, doc_mode_label, rag_mode, embed_model,
gen_model, openai_api_key, gemini_api_key, top_k):
empty_btns = _default_claim_button_updates()
empty_state_contexts = []
empty_state_trace = []
openai_key = str(openai_api_key or "").strip()
gemini_key = str(gemini_api_key or "").strip()
if not question or not question.strip():
yield "\u26a0\ufe0f Please enter a question.", "", "", "", "", empty_state_contexts, empty_state_trace, *empty_btns
return
if (not HAS_GPU) and ("(API)" not in str(gen_model)):
msg = "\u26a0\ufe0f No GPU detected. Please use an API generation model."
yield msg, "", msg, "", "", empty_state_contexts, empty_state_trace, *empty_btns
return
if (str(embed_model) in OPENAI_EMBED_MODELS_SET) and (not _has_openai_api_key(openai_key)):
msg = (
"\u26a0\ufe0f OpenAI embedding model selected but API key is missing. "
"Please input API key or set OPENAI_API_KEY."
)
yield msg, "", msg, "", "", empty_state_contexts, empty_state_trace, *empty_btns
return
if "(API)" in str(gen_model):
if _is_gemini_generation_model(gen_model):
if not _has_gemini_api_key(gemini_key):
msg = (
"\u26a0\ufe0f Gemini API generation model selected but API key is missing. "
"Please input API key or set GEMINI_API_KEY."
)
yield msg, "", msg, "", "", empty_state_contexts, empty_state_trace, *empty_btns
return
elif not _has_openai_api_key(openai_key):
msg = (
"\u26a0\ufe0f OpenAI API generation model selected but API key is missing. "
"Please input API key or set OPENAI_API_KEY."
)
yield msg, "", msg, "", "", empty_state_contexts, empty_state_trace, *empty_btns
return
if openai_key:
os.environ["OPENAI_API_KEY"] = openai_key
if gemini_key:
os.environ["GEMINI_API_KEY"] = gemini_key
backend_api_key = openai_key
if (not backend_api_key) and _is_gemini_generation_model(gen_model):
backend_api_key = gemini_key
doc_mode = "single" if doc_mode_label == "Single-document" else "multi"
rag_mode = str(rag_mode or "ClimateRAG")
q = question.strip()
try:
base_top_k = max(1, int(top_k))
except Exception:
base_top_k = 5
t0 = time.perf_counter()
if rag_mode != "ClimateRAG":
answer, contexts = run_rag(
question=q,
chunk_mode="length",
doc_mode=doc_mode,
top_k=base_top_k,
embed_name=embed_model,
gen_model=gen_model,
api_key=backend_api_key,
)
elapsed = time.perf_counter() - t0
answer_md = _format_answer(answer)
evidence_md = _format_evidence(contexts)
status = f"\u2705 Baseline RAG complete: retrieved {len(contexts)} passages."
timing_md = f"\u23f1\ufe0f **Elapsed:** `{elapsed:.2f}s`"
pipeline_md = (
"## Baseline RAG\n"
f"- Retrieved **{len(contexts)}** passages\n"
"- Single-step retrieval + generation completed."
)
yield answer_md, evidence_md, status, timing_md, pipeline_md, contexts, [], *empty_btns
return
answer_md = "*Waiting for STEP 2 answer...*"
evidence_md = "*Waiting for retrieval...*"
status = "⏳ ClimateRAG pipeline started."
timing_md = ""
pipeline_md = _render_waiting("STEP 1 Waiting......")
yield answer_md, evidence_md, status, timing_md, pipeline_md, empty_state_contexts, empty_state_trace, *empty_btns
try:
# ---------- Step 1: retrieval + clustering ----------
step1 = run_trustworthy_step1(
question=q,
doc_mode=doc_mode,
top_k=base_top_k,
embed_name=embed_model,
gen_model=gen_model,
api_key=backend_api_key,
)
step1_md = _render_step1_clusters_md(step1)
evidence_md = _format_evidence(step1.get("contexts", []))
status = "⏳ STEP 1 completed. Running STEP 2..."
pipeline_md = step1_md + "\n\n---\n\n" + _render_waiting("STEP 2 Waiting......")
yield answer_md, evidence_md, status, "", pipeline_md, step1.get("contexts", []), empty_state_trace, *empty_btns
# ---------- Step 2: answer generation ----------
step2 = run_trustworthy_step2(
question=q,
doc_mode=doc_mode,
contexts=step1.get("contexts", []),
clusters=step1.get("clusters", []),
gen_model=gen_model,
api_key=backend_api_key,
)
answer_md = _format_answer(step2.get("answer", ""))
step2_md = _render_step2_claims_md(step2)
status = "⏳ STEP 2 completed. Running STEP 3..."
pipeline_md = step1_md + "\n\n---\n\n" + step2_md + "\n\n---\n\n" + _render_waiting("STEP 3 Waiting......")
yield answer_md, evidence_md, status, "", pipeline_md, step1.get("contexts", []), empty_state_trace, *empty_btns
# ---------- Step 3: claim extractor ----------
step3 = run_trustworthy_step3_claims(
question=q,
answer=step2.get("answer", ""),
contexts=step1.get("contexts", []),
doc_mode=doc_mode,
gen_model=gen_model,
api_key=backend_api_key,
)
step3_md = _render_step3_md(step3)
step2_summary_md = _render_step2_summary_md(step2)
final_trace = _prepare_claim_trace(step3)
btn_updates = _claim_button_updates(final_trace)
elapsed = time.perf_counter() - t0
status = (
"\u2705 ClimateRAG pipeline completed: "
f"{len(step1.get('contexts', []))} passages, {len(step3.get('claims', []))} claims extracted."
)
timing_md = f"\u23f1\ufe0f **Elapsed:** `{elapsed:.2f}s`"
pipeline_md = step1_md + "\n\n---\n\n" + step2_summary_md + "\n\n---\n\n" + step3_md
yield answer_md, evidence_md, status, timing_md, pipeline_md, step1.get("contexts", []), final_trace, *btn_updates
return
except Exception as e:
elapsed = time.perf_counter() - t0
err = f"\u26a0\ufe0f ClimateRAG pipeline failed: {e}"
timing_md = f"\u23f1\ufe0f **Elapsed before failure:** `{elapsed:.2f}s`"
yield "*ClimateRAG pipeline failed.*", "", err, timing_md, f"{_render_waiting('Waiting......')}\n\n{err}", empty_state_contexts, empty_state_trace, *empty_btns
return
def build_report_name_list():
"""Build report name list without requiring local PDF files."""
reports = list_reports()
names = sorted({str(r.get("name", "")).strip() for r in reports if isinstance(r, dict) and str(r.get("name", "")).strip()})
if names:
return names
# Fallback to chunk JSON source names when Reports/ is removed.
try:
chunks = get_report_chunks("structure")
names = sorted([str(x).strip() for x in chunks.keys() if str(x).strip()])
if names:
return names
except Exception:
pass
try:
chunks = get_report_chunks("length")
names = sorted([str(x).strip() for x in chunks.keys() if str(x).strip()])
if names:
return names
except Exception:
pass
return []
def render_report_names_md(names):
if not names:
return "_No report names found from local PDFs or chunk JSON sources._"
lines = [f"### Report Names ({len(names)})", ""]
lines.extend([f"- `{n}`" for n in names])
return "\n".join(lines)
# ======================== CSS ========================
CUSTOM_CSS = """
:root {
--font: "Segoe UI", Roboto, Helvetica, Arial, sans-serif !important;
}
html, body, button, input, textarea, select {
font-family: "Segoe UI", Roboto, Helvetica, Arial, sans-serif !important;
-webkit-font-smoothing: antialiased;
-moz-osx-font-smoothing: grayscale;
}
/* Hide image toolbar buttons */
.gradio-image button,
.gradio-image .absolute {
display: none !important;
}
footer { display: none !important; }
.built-with { display: none !important; }
.hint-text {
color: #666;
font-size: 0.95em;
margin-top: 2px;
margin-bottom: 8px;
width: 100%;
max-width: none;
line-height: 1.45;
white-space: normal;
}
.logo-header {
display: flex;
align-items: center;
justify-content: center;
padding: 20px 16px 12px 16px;
background: linear-gradient(180deg, #f0f7ff 0%, #ffffff 100%);
border-radius: 12px;
margin-bottom: 8px;
min-height: 124px;
}
.logo-header-text {
text-align: center;
width: 100%;
max-width: 900px;
}
.logo-header h2 {
margin: 0 0 2px 0;
color: #1a5276;
font-size: 1.45em;
letter-spacing: 0.02em;
}
.logo-header p {
color: #666;
font-size: 0.95em;
margin: 0;
}
.mock-banner {
background: #fff3cd;
border: 1px solid #ffc107;
border-radius: 8px;
padding: 12px 20px;
margin: 0 0 12px 0;
color: #856404;
font-size: 0.92em;
}
.custom-footer {
text-align: center;
padding: 14px 0;
color: #bbb;
font-size: 0.83em;
border-top: 1px solid #eee;
margin-top: 20px;
}
.waiting-banner {
font-size: 2rem;
font-weight: 700;
color: #d35400;
text-align: center;
}
.maturity-card {
border: 1px solid #dbe7f3;
border-radius: 12px;
padding: 14px;
background: linear-gradient(180deg, #f8fbff 0%, #ffffff 100%);
}
.maturity-card h3 {
margin: 0 0 12px 0;
color: #154360;
}
.maturity-card h4 {
margin: 10px 0 6px 0;
color: #1b4f72;
}
.maturity-table-wrap {
overflow-x: auto;
}
.maturity-table {
width: 100%;
border-collapse: collapse;
font-size: 0.95rem;
}
.maturity-table th, .maturity-table td {
border: 1px solid #d6e4f0;
padding: 10px;
vertical-align: top;
text-align: left;
line-height: 1.45;
}
.maturity-table thead th {
background: #eaf3ff;
}
.maturity-table tbody tr:nth-child(even) {
background: #fbfdff;
}
.maturity-badge {
display: inline-block;
padding: 2px 10px;
border-radius: 999px;
font-weight: 600;
font-size: 0.86rem;
}
.maturity-badge.level-high { background: #e8f8f0; color: #117864; }
.maturity-badge.level-moderate { background: #fff4e5; color: #9c640c; }
.maturity-badge.level-low { background: #fdecea; color: #922b21; }
.maturity-badge.level-insufficient { background: #fdecea; color: #922b21; }
.maturity-badge.level-unknown { background: #eef2f7; color: #34495e; }
.maturity-list {
margin: 0;
padding-left: 18px;
}
.maturity-list li {
margin: 0 0 6px 0;
}
.muted { color: #9aa5b1; }
.maturity-conclusion {
margin-top: 12px;
border-top: 1px dashed #cad9e8;
padding-top: 10px;
}
.maturity-conclusion h4 {
margin: 0 0 6px 0;
color: #1b4f72;
}
.maturity-conclusion p {
margin: 0;
}
.maturity-note {
margin-top: 10px;
padding: 8px 10px;
border: 1px dashed #cad9e8;
border-radius: 8px;
color: #516274;
background: #f8fbff;
}
.metric-chip-wrap {
margin-top: 10px;
}
.metric-chip {
display: inline-block;
margin: 4px 6px 0 0;
padding: 4px 10px;
border-radius: 999px;
border: 1px solid #c8ddf2;
background: #edf5ff;
color: #1b4f72;
font-size: 0.84rem;
}
.metric-chip.trend-up { background: #e8f8f0; border-color: #bfe8d3; color: #117864; }
.metric-chip.trend-down { background: #fdecea; border-color: #f3c6c2; color: #922b21; }
.metric-chip.trend-flat { background: #fff4e5; border-color: #f2ddba; color: #9c640c; }
.metric-chip.trend-unknown { background: #eef2f7; border-color: #d6dce3; color: #5d6d7e; }
.maturity-split {
display: grid;
grid-template-columns: 1fr 1fr;
gap: 12px;
margin-top: 10px;
}
.confidence-card {
margin-top: 12px;
border: 1px solid #dbe7f3;
border-radius: 10px;
padding: 10px 12px;
background: #f8fbff;
}
.confidence-card h4 {
margin: 0 0 8px 0;
color: #1b4f72;
}
.conf-row {
display: grid;
grid-template-columns: 70px 1fr 60px;
gap: 8px;
align-items: center;
margin: 6px 0;
}
.conf-label {
font-weight: 600;
color: #34495e;
}
.conf-bar {
height: 10px;
border-radius: 999px;
background: #e9eff6;
overflow: hidden;
}
.conf-fill {
height: 100%;
border-radius: 999px;
}
.conf-fill.conf-high { background: #27ae60; }
.conf-fill.conf-medium { background: #f39c12; }
.conf-fill.conf-low { background: #e74c3c; }
.conf-value {
text-align: right;
color: #566573;
font-variant-numeric: tabular-nums;
}
@media (max-width: 900px) {
.logo-header {
min-height: auto;
padding-top: 12px;
padding-bottom: 12px;
flex-direction: column;
}
.maturity-split {
grid-template-columns: 1fr;
}
}
"""
# ======================== Gradio UI ========================
with gr.Blocks(
title="Climate Disclosure RAG"
) as demo:
# ---------- Header ----------
gr.HTML(
f"""
<div class="logo-header">
<div class="logo-header-text">
<h2>Climate Disclosure RAG</h2>
<p>AI-powered analysis of corporate sustainability &amp; climate disclosures</p>
</div>
</div>
"""
)
# ==================== Tab 1: Question Answering ====================
with gr.Tab("\U0001f50d Question Answering"):
gr.Markdown("### \U0001f4ac Ask a Question About Sustainability Reports")
with gr.Row():
doc_mode_radio = gr.Radio(
choices=["Multi-document", "Single-document"],
value="Multi-document",
label="Question Type",
info="Single: ask about one report | Multi: compare across reports",
)
single_hint = gr.Markdown(
'<p class="hint-text">'
'\U0001f4a1 Tip: We recommend prefixing your question with the report name, '
'e.g. <i>"For [Report 1 Name] and [Report 2 Name], does ...?"</i>'
'</p>',
visible=True,
)
question_box = gr.Textbox(
label="Your Question",
placeholder=PLACEHOLDER_MULTI,
lines=3,
max_lines=6,
info='Please click "Use Example Question" to use the recommended question.',
)
use_example_btn = gr.Button("Use Example Question", variant="primary")
gr.Markdown("#### \u2699\ufe0f Model Configuration")
with gr.Row():
with gr.Column(scale=1):
rag_mode_dd = gr.Dropdown(
choices=["ClimateRAG", "Baseline RAG"],
value="ClimateRAG",
label="RAG Mode",
)
with gr.Column(scale=1):
embed_model_dd = gr.Dropdown(
choices=EMBED_MODELS,
value=EMBED_MODELS[0],
label="\U0001f9e0 Embedding Model",
)
with gr.Column(scale=1):
gen_model_dd = gr.Dropdown(
choices=GEN_MODELS,
value=(GEN_MODELS[0] if HAS_GPU else API_GEN_MODEL),
label="\U0001f916 Generation Model",
)
if not HAS_GPU:
gr.Markdown(
"<span style='color:#8a8a8a;'>GPU not detected: local generation models are disabled. "
"Only API generation models are available.</span>"
)
gr.Markdown(
"<span style='color:#b0b0b0;'>Disabled (GPU-only): "
+ ", ".join(GPU_GEN_MODELS)
+ "</span>"
)
default_gen_model = GEN_MODELS[0] if HAS_GPU else API_GEN_MODEL
default_embed_model = EMBED_MODELS[0]
default_need_openai_key = (
(default_embed_model in OPENAI_EMBED_MODELS_SET)
or (("(API)" in str(default_gen_model)) and (not _is_gemini_generation_model(default_gen_model)))
)
default_need_gemini_key = ("(API)" in str(default_gen_model)) and _is_gemini_generation_model(default_gen_model)
openai_api_key_box = gr.Textbox(
label="\U0001f511 OpenAI API Key",
type="password",
placeholder="sk-...",
value=DEFAULT_OPENAI_API_KEY,
visible=default_need_openai_key,
info="Required for OpenAI embedding models and OpenAI API generation models.",
)
gemini_api_key_box = gr.Textbox(
label="\U0001f511 Gemini API Key",
type="password",
placeholder="AIza...",
value=DEFAULT_GEMINI_API_KEY,
visible=default_need_gemini_key,
info="Required for Gemini API generation models.",
)
top_k_slider = gr.Slider(
minimum=1, maximum=20, value=5, step=1,
label="\U0001f3af Top-K Retrieved Passages",
)
submit_btn = gr.Button("\U0001f680 Run Analysis", variant="primary", size="lg")
status_md = gr.Markdown("")
timing_md = gr.Markdown("")
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("#### ClimateRAG Pipeline")
pipeline_md = gr.Markdown(
value="*Three-step ClimateRAG pipeline output will appear here after Run.*",
sanitize_html=False,
)
gr.Markdown("#### Generated Answer")
answer_box = gr.Markdown(
value="*Answer will appear here after you click Run.*",
sanitize_html=False,
)
gr.Markdown("#### Claim Trace (Click to Highlight Evidence)")
with gr.Row():
claim_btn_1 = gr.Button("Claim 1", visible=False)
claim_btn_2 = gr.Button("Claim 2", visible=False)
claim_btn_3 = gr.Button("Claim 3", visible=False)
clear_highlight_btn = gr.Button("Clear Highlight", visible=True)
with gr.Column(scale=1):
gr.Markdown("#### Retrieved Evidence")
evidence_box = gr.Markdown(
value="*Results will appear here after you click Run.*",
sanitize_html=False,
)
contexts_state = gr.State([])
claim_trace_state = gr.State([])
# ---- Wiring ----
doc_mode_radio.change(
fn=on_doc_mode_change,
inputs=[doc_mode_radio],
outputs=[question_box, single_hint],
)
use_example_btn.click(
fn=lambda mode: PLACEHOLDER_SINGLE if mode == "Single-document" else PLACEHOLDER_MULTI,
inputs=[doc_mode_radio],
outputs=[question_box],
)
gen_model_dd.change(
fn=on_model_selection_change,
inputs=[gen_model_dd, embed_model_dd],
outputs=[openai_api_key_box, gemini_api_key_box],
queue=False,
)
embed_model_dd.change(
fn=on_model_selection_change,
inputs=[gen_model_dd, embed_model_dd],
outputs=[openai_api_key_box, gemini_api_key_box],
queue=False,
)
demo.load(
fn=on_model_selection_change,
inputs=[gen_model_dd, embed_model_dd],
outputs=[openai_api_key_box, gemini_api_key_box],
queue=False,
)
submit_btn.click(
fn=on_run_start,
outputs=[status_md, timing_md, pipeline_md],
queue=False,
).then(
fn=do_query,
inputs=[
question_box, doc_mode_radio, rag_mode_dd,
embed_model_dd, gen_model_dd,
openai_api_key_box, gemini_api_key_box, top_k_slider,
],
outputs=[
answer_box, evidence_box, status_md, timing_md, pipeline_md,
contexts_state, claim_trace_state,
claim_btn_1, claim_btn_2, claim_btn_3,
],
)
claim_btn_1.click(
fn=lambda ctx, trace: on_claim_click(0, ctx, trace),
inputs=[contexts_state, claim_trace_state],
outputs=[evidence_box],
queue=False,
)
claim_btn_2.click(
fn=lambda ctx, trace: on_claim_click(1, ctx, trace),
inputs=[contexts_state, claim_trace_state],
outputs=[evidence_box],
queue=False,
)
claim_btn_3.click(
fn=lambda ctx, trace: on_claim_click(2, ctx, trace),
inputs=[contexts_state, claim_trace_state],
outputs=[evidence_box],
queue=False,
)
clear_highlight_btn.click(
fn=clear_claim_highlight,
inputs=[contexts_state],
outputs=[evidence_box],
queue=False,
)
# ---- Tab 2: Document Library ----
with gr.Tab("\U0001f4da Document Library"):
gr.Markdown(
"### Sustainability Report Collection\n"
"Direct PDF download is disabled in this Space. "
"Use the official GitHub link to access report files."
)
gr.Markdown(f"Report download link: [ClimRetrieve Reports]({REPORTS_GITHUB_URL})")
report_names = build_report_name_list()
gr.Markdown(render_report_names_md(report_names))
# ==================== Tab 3: About ====================
with gr.Tab("ℹ️ About"):
gr.Markdown("""
### ClimateRAG — Climate Disclosure Retrieval-Augmented Generation for Evidence-based Question-Answering
Increasingly stringent global regulations require companies to provide detailed and auditable climate-related disclosures. These reports are often lengthy and visually complex, making manual analysis challenging for regulators and auditors who require precise evidence grounding rather than free-form answers.
ClimateRAG is a structured processing and reasoning framework designed for automated climate disclosure analysis. The system integrates hierarchical document chunking, an agent-based reasoning pipeline, and a claim extractor module to produce traceable, evidence-linked, and auditable outputs. It supports both single-document and multi-document analysis scenarios.
We additionally introduce a dataset of 367 expert-annotated question–answer pairs covering realistic regulatory and audit workflows. Experimental evaluation demonstrates the effectiveness and efficiency of the proposed framework for climate disclosure analysis.
The goal of ClimateRAG is to bridge Large Language Models with the rigorous standards required in regulatory auditing and sustainability reporting.
---
### Key Contributions
1. We develop ClimateRAG, the first system specifically designed for auditable and evidence-linked climate disclosure analysis with multi-document reasoning capability.
2. We construct a dataset of 367 annotated QA pairs spanning single-document and cross-document settings, aligned with real-world regulatory and auditing scenarios.
3. We conduct systematic evaluation to assess both retrieval and generation performance, validating the robustness and practical utility of the system.
---
### Project Website
https://cheng-tf.github.io/ClimateRAG/
""")
# ---------- Custom Footer ----------
gr.HTML(
'<div class="custom-footer">'
'Built with Gradio \u00b7 Powered by Climate Disclosure RAG \u00b7 \u00a9 2026'
'</div>'
)
# ======================== Launch ========================
if __name__ == "__main__":
server_name = os.getenv("APP_HOST", "0.0.0.0")
server_port = int(os.getenv("APP_PORT", "7860"))
root_path = os.getenv("APP_ROOT_PATH", "")
share = os.getenv("APP_SHARE", "false").lower() in {"1", "true", "yes", "y"}
allowed_paths = [
p for p in [REPORTS_DIR, SCRIPT_DIR]
if isinstance(p, str) and os.path.exists(p)
]
launch_kwargs = dict(
server_name=server_name,
server_port=server_port,
share=share,
show_error=True,
root_path=root_path if root_path else None,
css=CUSTOM_CSS,
theme=gr.themes.Soft(primary_hue="blue", secondary_hue="slate"),
)
if allowed_paths:
launch_kwargs["allowed_paths"] = allowed_paths
demo.launch(**launch_kwargs)