""" Climate Disclosure RAG ================================== Gradio Web Interface for sustainability report QA. Launch: python app.py """ import os import sys import json import time import re import html from urllib.parse import quote import gradio_client.utils as _gcu _orig_json_schema_fn = _gcu._json_schema_to_python_type def _safe_json_schema_to_python_type(schema, defs=None): if isinstance(schema, bool): return "Any" return _orig_json_schema_fn(schema, defs) _gcu._json_schema_to_python_type = _safe_json_schema_to_python_type import gradio as gr SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) if SCRIPT_DIR not in sys.path: sys.path.insert(0, SCRIPT_DIR) from rag_app_backend import ( run_rag, run_trustworthy_step1, run_trustworthy_step2, run_trustworthy_step3_claims, API_GEN_MODEL_ALIASES, list_reports, HAS_GPU, OPENAI_EMBED_MODELS, MOCK_MODE, REPORTS_DIR, get_report_chunks, ) # ======================== Constants ======================== PLACEHOLDER_SINGLE = ( "For 2022 Microsoft Environmental Sustainability Report, " "do the environmental/sustainability targets set by the company " "reference external climate change adaptation goals/targets?" ) PLACEHOLDER_MULTI = ( 'For "Does the company encourage downstream partners to carry out climate-related ' 'risk assessments?", is Boeing 2023 Sustainability Report better than ' 'AT&T 2022 Sustainability Summary in disclosure quality?' ) REPORTS_GITHUB_URL = "https://github.com/tobischimanski/ClimRetrieve/tree/main/Reports" CPU_EMBED_MODELS = [ "BM25", "text-embedding-3-large", "text-embedding-3-small", "text-embedding-ada-002", ] GPU_EMBED_MODELS = [ "Qwen3-Embedding-0.6B", "Qwen3-Embedding-4B", ] EMBED_MODELS = (CPU_EMBED_MODELS + GPU_EMBED_MODELS) if HAS_GPU else CPU_EMBED_MODELS GPU_GEN_MODELS = [ "Qwen3-4B-Instruct-2507-FP8", ] API_GEN_MODELS = list(API_GEN_MODEL_ALIASES.keys()) API_GEN_MODEL = API_GEN_MODELS[0] if API_GEN_MODELS else "GPT-5-mini (API)" GEN_MODELS = GPU_GEN_MODELS + API_GEN_MODELS if HAS_GPU else API_GEN_MODELS OPENAI_EMBED_MODELS_SET = set(OPENAI_EMBED_MODELS) DEFAULT_OPENAI_API_KEY = ( os.getenv("OPENAI_API_KEY", "").strip() or os.getenv("OPENAI_API_KEY_88996", "").strip() ) DEFAULT_GEMINI_API_KEY = os.getenv("GEMINI_API_KEY", "").strip() def _is_gemini_generation_model(gen_model: str) -> bool: text = str(gen_model or "").strip().upper() return "GEMINI" in text # ======================== Helpers ======================== _pdf_page_count_cache = {} def _get_pdf_total_pages(report_name: str) -> int: if not report_name: return 1 if report_name in _pdf_page_count_cache: return _pdf_page_count_cache[report_name] pdf_path = os.path.join(REPORTS_DIR, report_name) total = 1 try: try: from pypdf import PdfReader except Exception: from PyPDF2 import PdfReader reader = PdfReader(pdf_path) total = max(1, len(reader.pages)) except Exception: total = 1 _pdf_page_count_cache[report_name] = total return total def _pdf_iframe(report_name: str, page: int = 1) -> str: pdf_path = os.path.abspath(os.path.join(REPORTS_DIR, report_name)).replace("\\", "/") pdf_url = f"/file={quote(pdf_path, safe='/:')}#page={max(1, int(page))}&view=FitH" return ( f'' ) def _format_evidence(contexts, highlight_keys=None, highlight_color: str = "#ff7043"): highlight_set = set(highlight_keys or []) medals = {0: "\U0001F947", 1: "\U0001F948", 2: "\U0001F949"} parts = [] for i, c in enumerate(contexts): badge = medals.get(i, f"#{i+1}") report_short = c["report"].replace(".pdf", "") score = c["score"] page = c.get("page", None) key = (str(c.get("report", "")), str(c.get("chunk_idx", ""))) text_body = str(c.get("text", ""))[:800] if key in highlight_set: safe_text = html.escape(text_body) text_body = ( f"
{safe_text}
" ) parts.append( f"### {badge} {report_short}\n" f"**Similarity:** {score:.4f} | " f"**Chunk:** {c['chunk_idx']}" + (f", **Page:** {page}" if page not in (None, "", "NA") else "") + "\n\n" f"{text_body}" ) return "\n\n---\n\n".join(parts) if parts else "No evidence retrieved." def _render_waiting(text: str) -> str: return f"
{text}
" def _preview_chunk_text(text: str, max_sentences: int = 2, max_chars: int = 260) -> str: raw = str(text or "").replace("\n", " ").replace("\r", " ").strip() if not raw: return "" sentences = [s.strip() for s in re.split(r"(?<=[.!?])\s+", raw) if s.strip()] if sentences: preview = " ".join(sentences[:max_sentences]).strip() else: preview = raw[:max_chars].strip() if len(preview) > max_chars: preview = preview[:max_chars].rstrip() if len(preview) < len(raw): return preview + "..." return preview def _render_step1_clusters_md(step1: dict) -> str: contexts = step1.get("contexts", []) if isinstance(step1, dict) else [] avg_sim = float(step1.get("average_similarity", 0.0) or 0.0) if isinstance(step1, dict) else 0.0 clusters = step1.get("clusters", []) if isinstance(step1, dict) else [] lines = [ "## STEP 1 - RETRIEVAL & EVIDENCE CLUSTERS", f"- Retrieved **{len(contexts)}** paragraphs", f"- Clustered into **{len(clusters)}** groups", f"- Average similarity: **{avg_sim:.2f}**", "- Note: Average similarity is the mean of retrieved Similarity scores.", "", ] if not clusters: lines.append("_No clusters returned._") return "\n".join(lines) for i, c in enumerate(clusters): cname = str(c.get("cluster_name", f"Cluster {i + 1}")).strip() csum = str(c.get("summary", "")).strip() lines.append(f"### Cluster {chr(65 + (i % 26))} - {cname}") if csum: lines.append(f"Summary: {csum}") evs = c.get("evidence", []) if isinstance(evs, list) and evs: for ev in evs: chunk_idx = ev.get("chunk_idx", "NA") preview = _preview_chunk_text(str(ev.get("text", "")), max_sentences=2, max_chars=320) lines.append(f"- Chunk {chunk_idx}: {preview}") else: lines.append("- No linked evidence items") lines.append("") return "\n".join(lines).strip() def _render_step2_claims_md(step2: dict) -> str: if not isinstance(step2, dict): return "## STEP 2 - ANSWER GENERATION\n_No generation output._" answer_ready = str(step2.get("answer", "")).strip() lines = ["## STEP 2 - ANSWER GENERATION", ""] if not answer_ready: lines.append("- Warning: no answer returned from Step 2.") return "\n".join(lines) lines.append("### Generated Answer") lines.append(_format_answer(answer_ready)) return "\n".join(lines) def _render_step2_summary_md(step2: dict) -> str: if not isinstance(step2, dict): return "## STEP 2 - ANSWER GENERATION\n_No generation output._" answer_ready = str(step2.get("answer", "")).strip() if not answer_ready: return "## STEP 2 - ANSWER GENERATION\n- Warning: no answer returned from Step 2." return "\n".join( [ "## STEP 2 - ANSWER GENERATION", "- Generated answer is shown in the **Generated Answer** panel.", ] ) def _render_step3_md(step3: dict) -> str: return "\n".join( [ "## STEP 3 - CLAIM EXTRACTOR", "", "Use the **Claim Trace** buttons below to inspect full claims and highlight linked evidence chunks.", ] ) CLAIM_COLORS = ["#ff7043", "#ffd54f", "#4fc3f7"] def _prepare_claim_trace(step3: dict): claim_links = step3.get("claim_links", []) if isinstance(step3, dict) else [] if not isinstance(claim_links, list): claim_links = [] trace = [] for row in claim_links[:3]: if not isinstance(row, dict): continue claim_text = str(row.get("claim", "")).strip() evs = row.get("evidence", []) if not isinstance(evs, list): evs = [] keys = [] chunk_refs = [] for ev in evs: if not isinstance(ev, dict): continue report = str(ev.get("report", "")) chunk_idx = str(ev.get("chunk_idx", "")) keys.append((report, chunk_idx)) if report or chunk_idx: report_short = report.replace(".pdf", "").strip() if report else "" if report_short and chunk_idx: chunk_refs.append(f"{report_short} chunk {chunk_idx}") elif chunk_idx: chunk_refs.append(f"chunk {chunk_idx}") elif report_short: chunk_refs.append(report_short) trace.append( { "label": claim_text or "Claim", "keys": keys, "chunk_refs": chunk_refs, "score": None, } ) return trace def _default_claim_button_updates(): return [ gr.update(value="Claim 1", visible=False), gr.update(value="Claim 2", visible=False), gr.update(value="Claim 3", visible=False), ] def _claim_button_updates(trace): updates = [] for i in range(3): if i < len(trace): label = str(trace[i].get("label", f"Claim {i+1}")).strip() score = trace[i].get("score", None) prefix = ["🟧", "🟨", "🟦"][i] refs = trace[i].get("chunk_refs", []) if isinstance(trace[i], dict) else [] ref_text = ", ".join(refs) if refs else "no chunk" if score is None: text = f"{prefix} Claim {i+1} [{ref_text}]: {label}" else: text = f"{prefix} Claim {i+1} ({score:.2f}) [{ref_text}]: {label}" updates.append(gr.update(value=text, visible=True)) else: updates.append(gr.update(value=f"Claim {i+1}", visible=False)) return updates def on_claim_click(claim_idx, contexts_state, claim_trace_state): contexts = contexts_state if isinstance(contexts_state, list) else [] trace = claim_trace_state if isinstance(claim_trace_state, list) else [] idx = int(claim_idx) if idx < 0 or idx >= len(trace): return _format_evidence(contexts) keys = trace[idx].get("keys", []) if isinstance(trace[idx], dict) else [] color = CLAIM_COLORS[idx % len(CLAIM_COLORS)] return _format_evidence(contexts, highlight_keys=keys, highlight_color=color) def clear_claim_highlight(contexts_state): contexts = contexts_state if isinstance(contexts_state, list) else [] return _format_evidence(contexts) def _md_cell(value) -> str: text = "" if value is None else str(value) text = text.replace("\n", " ").replace("\r", " ").strip() return text.replace("|", "\\|") def _md_table(rows, columns) -> str: if not rows: return "" header = "| " + " | ".join(columns) + " |" sep = "| " + " | ".join(["---"] * len(columns)) + " |" body = [] for row in rows: body.append("| " + " | ".join(_md_cell(row.get(c, "")) for c in columns) + " |") return "\n".join([header, sep] + body) def _truncate(text: str, max_len: int = 180) -> str: text = _md_cell(text) return text if len(text) <= max_len else text[: max_len - 3] + "..." def _shorten_text(text: str, max_len: int = 240) -> str: text = "" if text is None else str(text) text = text.replace("\n", " ").replace("\r", " ").strip() return text if len(text) <= max_len else text[: max_len - 3] + "..." def _pretty_company_name(raw: str) -> str: if not raw: return "" name = raw.replace("_", " ").strip() words = [w.upper() if w.lower() in {"esg"} else w.capitalize() for w in name.split()] return " ".join(words) def _escape(text) -> str: return html.escape("" if text is None else str(text)) def _extract_number_snippets(texts, max_items: int = 3): snippets = [] if not isinstance(texts, list): return snippets pattern = re.compile( r"\b\d+(?:\.\d+)?\s?(?:%|percent|billion|million|thousand|mt|tco2e|tons?|years?)\b", flags=re.IGNORECASE, ) for t in texts: s = str(t or "") for m in pattern.finditer(s): start = max(0, m.start() - 40) end = min(len(s), m.end() + 40) snippets.append(_shorten_text(s[start:end], 120)) if len(snippets) >= max_items: return snippets return snippets def _normalize_confidence_distribution(raw): if not isinstance(raw, dict): return None out = {} for key in ("high", "medium", "low"): v = raw.get(key, None) if v is None: continue try: if isinstance(v, str): s = v.strip() if s.endswith("%"): val = float(s[:-1]) / 100.0 else: val = float(s) else: val = float(v) if val > 1.0 and val <= 100.0: val = val / 100.0 val = max(0.0, min(1.0, val)) out[key] = val except Exception: continue if not out: return None for key in ("high", "medium", "low"): out.setdefault(key, 0.0) total = out["high"] + out["medium"] + out["low"] if total <= 0: return None # Normalize to sum=1 for stable UI display. out["high"] = out["high"] / total out["medium"] = out["medium"] / total out["low"] = out["low"] / total return out def _extract_confidence_distribution_from_text(text: str): raw = str(text or "") cleaned = raw m = re.search(r"confidence\s*distribution\s*:\s*(\{[^{}]+\})", raw, flags=re.IGNORECASE) if m: blob = m.group(1) parsed = None for cand in (blob, blob.replace("'", '"')): try: parsed = json.loads(cand) break except Exception: continue conf = _normalize_confidence_distribution(parsed) if conf: cleaned = raw.replace(m.group(0), "").strip() return conf, cleaned tmp = {} for k in ("high", "medium", "low"): mk = re.search(rf"\b{k}\b\s*[:=]\s*([0-9]+(?:\.[0-9]+)?)\s*(%)?", raw, flags=re.IGNORECASE) if not mk: continue num = float(mk.group(1)) if mk.group(2): num = num / 100.0 tmp[k] = num conf = _normalize_confidence_distribution(tmp if tmp else None) return conf, cleaned def _render_confidence_distribution(raw) -> str: conf = _normalize_confidence_distribution(raw) if not conf: return "" def _row(label: str, key: str, cls: str) -> str: pct = conf[key] * 100.0 return ( "
" f"
{label}
" "
" f"
" "
" f"
{pct:.1f}%
" "
" ) return ( "
" "

Confidence Distribution

" f"{_row('High', 'high', 'conf-high')}" f"{_row('Medium', 'medium', 'conf-medium')}" f"{_row('Low', 'low', 'conf-low')}" "
" ) def _attach_confidence_block(card_html: str, parsed: dict) -> str: conf_html = _render_confidence_distribution(parsed.get("confidence_distribution")) if not conf_html: return card_html text = str(card_html or "") if text.rstrip().endswith(""): return text.rstrip()[:-6] + conf_html + "" return text + conf_html def _normalize_skill_for_render(parsed: dict) -> str: skill = str(parsed.get("skill", "")).strip().lower() if "trend" in skill and "quant" in skill: return "Trend & Quant Comparator" if "attainment" in skill or ("delta" in skill and "benchmark" in skill): return "Target Attainment & Delta Benchmark" if "compliance" in skill and "check" in skill: return "Compliance Checklist" if "dimension" in skill and "extract" in skill: return "Dimension Extractor" if "contradiction" in skill or "consistency" in skill: return "Contradiction/Consistency Check" if "consensus" in skill or "portfolio" in skill or "count" in skill: return "Consensus/Count (Portfolio Statistics)" if "comparative" in skill or "table" in skill: return "Comparative Table Builder" # Fallback by schema keys when model misses `skill`. if isinstance(parsed.get("required_checks"), list) and isinstance(parsed.get("reports"), list): return "Compliance Checklist" if isinstance(parsed.get("counts"), dict) and isinstance(parsed.get("per_report"), list): return "Consensus/Count (Portfolio Statistics)" if isinstance(parsed.get("checks"), list) and isinstance(parsed.get("scores"), dict): return "Contradiction/Consistency Check" reports = parsed.get("reports") if isinstance(reports, list) and reports: first = reports[0] if isinstance(reports[0], dict) else {} if isinstance(first.get("benchmarks"), list): return "Target Attainment & Delta Benchmark" if isinstance(first.get("quant_metrics"), list) and "strength_score" in first: return "Trend & Quant Comparator" if isinstance(first.get("bucket_counts"), dict): return "Dimension Extractor" if "maturity_level" in first or "comparison_metrics" in parsed: return "Comparative Table Builder" if str(parsed.get("report", "")).strip() and any(k in parsed for k in ("maturity_level", "key_evidence", "quant_metrics", "comparison_metrics", "year")): return "Comparative Table Builder" return "" def _render_quant_metrics_matrix(reports: list, report_key: str = "report", metrics_key: str = "quant_metrics") -> str: if not isinstance(reports, list) or not reports: return "" cols = [] metric_map = {} for r in reports: if not isinstance(r, dict): continue name = str(r.get(report_key, "Unknown")).strip() or "Unknown" cols.append(name) qms = r.get(metrics_key, []) if not isinstance(qms, list): continue for m in qms: if not isinstance(m, dict): continue metric = str(m.get("metric", "")).strip() if not metric: continue value = m.get("value", None) unit = m.get("unit", None) period = m.get("period", None) if value is None or str(value).strip() == "": cell = "N/A" else: cell = str(value) if unit not in (None, ""): cell += f" {unit}" if period not in (None, ""): cell += f" ({period})" metric_map.setdefault(metric, {})[name] = cell if not metric_map: return "
No explicit quantitative metrics found in the output.
" head_cols = "".join(f"{_escape(c)}" for c in cols) body_rows = [] for metric in sorted(metric_map.keys()): row_cells = "".join(f"{_escape(metric_map[metric].get(c, 'N/A'))}" for c in cols) body_rows.append(f"{_escape(metric)}{row_cells}") return ( "

Quantitative Comparison

" "
" "" f"{head_cols}" f"{''.join(body_rows)}" "
Metric
" ) def _render_maturity_comparison_html(parsed: dict) -> str: mc = parsed.get("maturity_comparison") if not isinstance(mc, dict) or not mc: return "" companies = list(mc.keys()) rows_html = [] level_cells = [] evidence_cells = [] quant_cells = [] for k in companies: item = mc.get(k, {}) if isinstance(mc.get(k), dict) else {} level = str(item.get("maturity_level", "unknown")).strip() low = level.lower() suffix = " x" if low == "insufficient" else "" badge_cls = f"level-{low}" if low in {"high", "moderate", "low", "insufficient"} else "level-unknown" level_cells.append(f'{_escape(level + suffix)}') evidence = item.get("evidence", []) if isinstance(evidence, list) and evidence: bullets = "".join(f"
  • {_escape(_shorten_text(e, 180))}
  • " for e in evidence[:2]) evidence_cells.append(f"") else: evidence_cells.append("N/A x") rationale = item.get("rationale", "") snippets = _extract_number_snippets((evidence if isinstance(evidence, list) else []) + [rationale], max_items=3) if snippets: quant_cells.append("") else: quant_cells.append("No numeric metric found") rows_html.append("Maturity Level" + "".join(level_cells) + "") rows_html.append("Key Evidence" + "".join(evidence_cells) + "") rows_html.append("Quant Metrics" + "".join(quant_cells) + "") header_cells = "".join(f"{_escape(_pretty_company_name(c))}" for c in companies) conclusion = parsed.get("conclusion", "") conclusion_html = "" if isinstance(conclusion, str) and conclusion.strip(): conclusion_html = ( "
    " "

    Conclusion

    " f"

    {_escape(conclusion.strip())}

    " "
    " ) return ( "
    " "

    Company Maturity Comparison

    " "
    " "" "" f"{header_cells}" "" "" f"{''.join(rows_html)}" "
    Attribute
    " f"{conclusion_html}" "
    " ) def _render_comparative_table_builder_html(parsed: dict) -> str: reports = parsed.get("reports", []) if not isinstance(reports, list) or not reports: return "" header_cells = "".join(f"{_escape(r.get('report', 'Unknown'))}" for r in reports if isinstance(r, dict)) if not header_cells: return "" maturity_cells = [] evidence_cells = [] for r in reports: if not isinstance(r, dict): continue level = str(r.get("maturity_level", "unknown")).strip() low = level.lower() suffix = " x" if low == "insufficient" else "" cls = f"level-{low}" if low in {"high", "moderate", "low", "insufficient"} else "level-unknown" maturity_cells.append(f'{_escape(level + suffix)}') evidence = r.get("key_evidence", []) if isinstance(evidence, list) and evidence: bullets = "".join(f"
  • {_escape(_shorten_text(e, 180))}
  • " for e in evidence[:2]) evidence_cells.append(f"") else: evidence_cells.append("N/A") compare_metrics = parsed.get("comparison_metrics", []) compare_html = "" if isinstance(compare_metrics, list) and compare_metrics: chips = "".join(f"{_escape(m)}" for m in compare_metrics[:8]) compare_html = f"
    Compared metrics: {chips}
    " matrix_html = _render_quant_metrics_matrix(reports, report_key="report", metrics_key="quant_metrics") conclusion = str(parsed.get("conclusion", "")).strip() conclusion_html = f"

    Conclusion

    {_escape(conclusion)}

    " if conclusion else "" return ( "
    " "

    Comparative Table Builder

    " "
    " f"{header_cells}" "" f"{''.join(maturity_cells)}" f"{''.join(evidence_cells)}" "
    Attribute
    Maturity Level
    Key Evidence
    " f"{compare_html}" f"{matrix_html}" f"{conclusion_html}" "
    " ) def _format_num(v) -> str: if v in (None, ""): return "N/A" try: f = float(v) if f.is_integer(): return str(int(f)) return f"{f:.2f}" except Exception: return str(v) def _format_pct(v) -> str: if v in (None, ""): return "N/A" try: return f"{float(v):.2f}%" except Exception: s = str(v) return s if s.endswith("%") else f"{s}%" def _trend_badge(direction: str) -> str: d = str(direction or "unknown").strip().lower() icon = {"up": "up", "down": "down", "flat": "flat"}.get(d, "unknown") return f"{_escape(d)}" def _render_trend_metric_cell(metric_obj: dict) -> str: if not isinstance(metric_obj, dict): return "N/A" value = _format_num(metric_obj.get("value")) unit = metric_obj.get("unit") period = metric_obj.get("period") value_line = value if unit not in (None, "") and value != "N/A": value_line = f"{value} {unit}" if period not in (None, "") and value_line != "N/A": value_line = f"{value_line} ({period})" intensity = _format_num(metric_obj.get("intensity")) attainment = _format_pct(metric_obj.get("attainment_rate")) change = _format_num(metric_obj.get("change_magnitude")) note = _shorten_text(metric_obj.get("note", ""), 90) note_html = "" if not note else f"
    {_escape(note)}
    " return ( f"
    Value: {_escape(value_line)}
    " f"
    Intensity: {_escape(intensity)}
    " f"
    Attainment: {_escape(attainment)}
    " f"
    Change: {_escape(change)}
    " f"
    Trend: {_trend_badge(metric_obj.get('trend_direction', 'unknown'))}
    " f"{note_html}" ) def _render_trend_quant_comparator_html(parsed: dict) -> str: reports = parsed.get("reports", []) if not isinstance(reports, list) or not reports: return "" report_names = [str(r.get("report", "Unknown")) for r in reports if isinstance(r, dict)] if not report_names: return "" header = "".join(f"{_escape(n)}" for n in report_names) strength_cells = [] evidence_cells = [] metric_map = {} for r in reports: if not isinstance(r, dict): continue rn = str(r.get("report", "Unknown")) strength_cells.append(f"{_escape(_format_num(r.get('strength_score')))}") ev = r.get("key_evidence", []) if isinstance(ev, list) and ev: bullets = "".join(f"
  • {_escape(_shorten_text(x, 130))}
  • " for x in ev[:3]) evidence_cells.append(f"") else: evidence_cells.append("N/A") for m in r.get("quant_metrics", []) if isinstance(r.get("quant_metrics"), list) else []: if not isinstance(m, dict): continue metric = str(m.get("metric", "")).strip() if not metric: continue metric_map.setdefault(metric, {})[rn] = m metric_rows = [] for metric in sorted(metric_map.keys()): cells = [] for rn in report_names: cells.append(f"{_render_trend_metric_cell(metric_map[metric].get(rn, {}))}") metric_rows.append(f"{_escape(metric)}{''.join(cells)}") if not metric_rows: metric_rows = [f"Quant MetricsNo quantitative metrics returned."] highlights = parsed.get("metric_highlights", []) highlight_html = "" if isinstance(highlights, list) and highlights: chips = "".join(f"{_escape(_shorten_text(x, 60))}" for x in highlights[:10]) highlight_html = f"
    Metric Highlights: {chips}
    " conclusion = str(parsed.get("conclusion", "")).strip() conclusion_html = f"

    Conclusion

    {_escape(conclusion)}

    " if conclusion else "" return ( "
    " "

    Trend & Quant Comparator

    " "
    " f"{header}" "" f"{''.join(strength_cells)}" f"{''.join(evidence_cells)}" f"{''.join(metric_rows)}" "
    Attribute
    Strength Score
    Key Evidence
    " f"{highlight_html}" f"{conclusion_html}" "
    " ) def _render_benchmark_metric_cell(metric_obj: dict) -> str: if not isinstance(metric_obj, dict): return "N/A" unit = metric_obj.get("unit") base_v = _format_num(metric_obj.get("baseline_value")) cur_v = _format_num(metric_obj.get("current_value")) tgt_v = _format_num(metric_obj.get("target_value")) base_p = metric_obj.get("baseline_period") cur_p = metric_obj.get("current_period") tgt_p = metric_obj.get("target_period") def _with_unit(v): if v == "N/A": return v return f"{v} {unit}" if unit not in (None, "") else v baseline = _with_unit(base_v) current = _with_unit(cur_v) target = _with_unit(tgt_v) baseline = f"{baseline} ({base_p})" if base_p not in (None, "") and baseline != "N/A" else baseline current = f"{current} ({cur_p})" if cur_p not in (None, "") and current != "N/A" else current target = f"{target} ({tgt_p})" if tgt_p not in (None, "") and target != "N/A" else target attainment = _format_pct(metric_obj.get("attainment_rate")) delta_abs = _format_num(metric_obj.get("delta_abs")) delta_pct = _format_pct(metric_obj.get("delta_percent")) intensity = _format_num(metric_obj.get("intensity")) note = _shorten_text(metric_obj.get("note", ""), 90) note_html = "" if not note else f"
    {_escape(note)}
    " return ( f"
    Baseline: {_escape(baseline)}
    " f"
    Current: {_escape(current)}
    " f"
    Target: {_escape(target)}
    " f"
    Attainment: {_escape(attainment)}
    " f"
    Delta (abs/%): {_escape(delta_abs)} / {_escape(delta_pct)}
    " f"
    Intensity: {_escape(intensity)}
    " f"
    Trend: {_trend_badge(metric_obj.get('trend_direction', 'unknown'))}
    " f"{note_html}" ) def _render_target_attainment_delta_html(parsed: dict) -> str: reports = parsed.get("reports", []) if not isinstance(reports, list) or not reports: return "" report_names = [str(r.get("report", "Unknown")) for r in reports if isinstance(r, dict)] if not report_names: return "" header = "".join(f"{_escape(n)}" for n in report_names) strength_cells = [] evidence_cells = [] metric_map = {} for r in reports: if not isinstance(r, dict): continue rn = str(r.get("report", "Unknown")) strength = str(r.get("overall_strength", "insufficient")).strip() low = strength.lower() suffix = " x" if low == "insufficient" else "" cls = f"level-{low}" if low in {"high", "moderate", "low", "insufficient"} else "level-unknown" strength_cells.append(f'{_escape(strength + suffix)}') ev = r.get("key_evidence", []) if isinstance(ev, list) and ev: bullets = "".join(f"
  • {_escape(_shorten_text(x, 130))}
  • " for x in ev[:3]) evidence_cells.append(f"") else: evidence_cells.append("N/A") for m in r.get("benchmarks", []) if isinstance(r.get("benchmarks"), list) else []: if not isinstance(m, dict): continue metric = str(m.get("metric", "")).strip() if not metric: continue metric_map.setdefault(metric, {})[rn] = m metric_rows = [] for metric in sorted(metric_map.keys()): cells = [] for rn in report_names: cells.append(f"{_render_benchmark_metric_cell(metric_map[metric].get(rn, {}))}") metric_rows.append(f"{_escape(metric)}{''.join(cells)}") if not metric_rows: metric_rows = [f"BenchmarksNo benchmark metrics returned."] leaderboard = parsed.get("leaderboard", []) leaderboard_html = "" if isinstance(leaderboard, list) and leaderboard: rows = [] for item in leaderboard[:6]: if not isinstance(item, dict): continue rows.append( "" f"{_escape(item.get('report', ''))}" f"{_escape(_format_num(item.get('score')))}" f"{_escape(_shorten_text(item.get('reason', ''), 120))}" "" ) if rows: leaderboard_html = ( "
    " "" "" f"{''.join(rows)}" "
    Leaderboard ReportScoreReason
    " ) conclusion = str(parsed.get("conclusion", "")).strip() conclusion_html = f"

    Conclusion

    {_escape(conclusion)}

    " if conclusion else "" return ( "
    " "

    Target Attainment & Delta Benchmark

    " "
    " f"{header}" "" f"{''.join(strength_cells)}" f"{''.join(evidence_cells)}" f"{''.join(metric_rows)}" "
    Attribute
    Overall Strength
    Key Evidence
    " f"{leaderboard_html}" f"{conclusion_html}" "
    " ) def _render_compliance_checklist_html(parsed: dict) -> str: reports = parsed.get("reports", []) if not isinstance(reports, list) or not reports: return "" required_items = parsed.get("required_checks", []) if not isinstance(required_items, list): required_items = [] report_names = [str(r.get("report", "Unknown")) for r in reports if isinstance(r, dict)] if not report_names: return "" header = "".join(f"{_escape(n)}" for n in report_names) summary_cells = [] evidence_cells = [] for r in reports: if not isinstance(r, dict): continue s = r.get("summary", {}) if isinstance(r.get("summary"), dict) else {} summary_cells.append( "" f"pass={_escape(s.get('pass', 0))}, partial={_escape(s.get('partial', 0))}, fail={_escape(s.get('fail', 0))}" f"
    completion={_escape(s.get('completion_rate', 'N/A'))}" "" ) ev = r.get("key_evidence", []) if isinstance(ev, list) and ev: bullets = "".join(f"
  • {_escape(_shorten_text(x, 130))}
  • " for x in ev[:3]) evidence_cells.append(f"") else: evidence_cells.append("N/A") item_rows = [] for item in required_items: cells = [] for r in reports: status = "insufficient" quant = "N/A" note = "" checks = r.get("checks", []) if isinstance(r, dict) else [] if isinstance(checks, list): for c in checks: if not isinstance(c, dict): continue if str(c.get("item", "")).strip().lower() == str(item).strip().lower(): status = str(c.get("status", "insufficient")) qv = c.get("quant_value", None) qu = c.get("quant_unit", None) quant = "N/A" if qv in (None, "") else f"{qv}{'' if qu in (None, '') else ' ' + str(qu)}" note = _shorten_text(c.get("note", ""), 90) break mark = " x" if status.lower() in {"fail", "insufficient"} else "" cells.append(f"{_escape(status + mark)}
    {_escape(quant)}
    {_escape(note)}") item_rows.append(f"{_escape(item)}{''.join(cells)}") matrix_html = _render_quant_metrics_matrix(reports, report_key="report", metrics_key="quant_metrics") conclusion = str(parsed.get("conclusion", "")).strip() conclusion_html = f"

    Conclusion

    {_escape(conclusion)}

    " if conclusion else "" return ( "
    " "

    Compliance Checklist

    " "
    " f"{header}" "" f"{''.join(summary_cells)}" f"{''.join(evidence_cells)}" f"{''.join(item_rows)}" "
    Checklist Item
    Summary
    Key Evidence
    " f"{matrix_html}" f"{conclusion_html}" "
    " ) def _render_dimension_extractor_html(parsed: dict) -> str: reports = parsed.get("reports", []) if not isinstance(reports, list) or not reports: return "" report_names = [str(r.get("report", "Unknown")) for r in reports if isinstance(r, dict)] if not report_names: return "" header = "".join(f"{_escape(n)}" for n in report_names) rows = [] for b in ["Process", "Input", "Output", "Outcome", "Governance", "Risk"]: cells = [] for r in reports: bc = r.get("bucket_counts", {}) if isinstance(r, dict) and isinstance(r.get("bucket_counts"), dict) else {} cells.append(f"{_escape(bc.get(b, 0))}") rows.append(f"{_escape(b)}{''.join(cells)}") coverage_cells = [] evidence_cells = [] for r in reports: level = str(r.get("coverage_level", "unknown")) if isinstance(r, dict) else "unknown" low = level.lower() suffix = " x" if low == "insufficient" else "" cls = f"level-{low}" if low in {"high", "moderate", "low", "insufficient"} else "level-unknown" coverage_cells.append(f'{_escape(level + suffix)}') ev = r.get("key_evidence", []) if isinstance(r, dict) else [] if isinstance(ev, list) and ev: bullets = "".join(f"
  • {_escape(_shorten_text(x, 130))}
  • " for x in ev[:3]) evidence_cells.append(f"") else: evidence_cells.append("N/A") matrix_html = _render_quant_metrics_matrix(reports, report_key="report", metrics_key="quant_metrics") conclusion = str(parsed.get("conclusion", "")).strip() conclusion_html = f"

    Conclusion

    {_escape(conclusion)}

    " if conclusion else "" return ( "
    " "

    Dimension Extractor

    " "
    " f"{header}" "" f"{''.join(rows)}" f"{''.join(coverage_cells)}" f"{''.join(evidence_cells)}" "
    Bucket
    Coverage Level
    Key Evidence
    " f"{matrix_html}" f"{conclusion_html}" "
    " ) def _render_consistency_check_html(parsed: dict) -> str: checks = parsed.get("checks", []) scores = parsed.get("scores", {}) if isinstance(parsed.get("scores"), dict) else {} if not isinstance(checks, list): checks = [] check_rows = [] for c in checks: if not isinstance(c, dict): continue result = str(c.get("result", "insufficient")).strip() mark = " x" if result.lower() in {"inconsistent", "insufficient"} else "" check_rows.append( "" f"{_escape(c.get('rule', ''))}" f"{_escape(result + mark)}" f"{_escape(_shorten_text(c.get('note', ''), 180))}" "" ) check_rows_html = "".join(check_rows) if check_rows else "No checks returned." key_evidence = parsed.get("key_evidence", []) key_evidence_html = "" if isinstance(key_evidence, list) and key_evidence: bullets = "".join(f"
  • {_escape(_shorten_text(x, 180))}
  • " for x in key_evidence[:6]) key_evidence_html = ( "
    " "" f"" "
    Key Evidence
      {bullets}
    " ) conclusion = str(parsed.get("conclusion", "")).strip() conclusion_html = f"

    Conclusion

    {_escape(conclusion)}

    " if conclusion else "" return ( "
    " "

    Contradiction / Consistency Check

    " "
    " "" f"{check_rows_html}" "
    RuleResultNote
    " f"{key_evidence_html}" "
    " "" "" f"" f"" f"" f"" "
    Score ItemValue
    consistent{_escape(scores.get('consistent', 0))}
    inconsistent{_escape(scores.get('inconsistent', 0))}
    insufficient{_escape(scores.get('insufficient', 0))}
    consistency_rate{_escape(scores.get('consistency_rate', 'N/A'))}
    " f"{conclusion_html}" "
    " ) def _render_consensus_count_html(parsed: dict) -> str: counts = parsed.get("counts", {}) if isinstance(parsed.get("counts"), dict) else {} percentages = parsed.get("percentages", {}) if isinstance(parsed.get("percentages"), dict) else {} per_report = parsed.get("per_report", []) if not isinstance(per_report, list): per_report = [] ev_map = {} key_evidence_by_report = parsed.get("key_evidence_by_report", []) if isinstance(key_evidence_by_report, list): for row in key_evidence_by_report: if not isinstance(row, dict): continue report = str(row.get("report", "")).strip() ev = row.get("key_evidence", []) if report and isinstance(ev, list): ev_map[report] = ev report_rows = [] for r in per_report: if not isinstance(r, dict): continue label = str(r.get("label", "insufficient")) mark = " x" if label.lower() in {"missing", "insufficient"} else "" report = str(r.get("report", "")) ev = ev_map.get(report, r.get("key_evidence", [])) ev_html = "N/A" if isinstance(ev, list) and ev: ev_html = "" report_rows.append("{}{}{}".format(_escape(report), _escape(label + mark), ev_html)) report_rows_html = "".join(report_rows) if report_rows else "No report labels returned." matrix_html = _render_quant_metrics_matrix(per_report, report_key="report", metrics_key="quant_metrics") consensus_items = parsed.get("consensus_items", []) outliers = parsed.get("outliers", []) consensus_html = "".join(f"
  • {_escape(_shorten_text(x, 140))}
  • " for x in consensus_items[:6]) if isinstance(consensus_items, list) else "" outliers_html = "".join(f"
  • {_escape(_shorten_text(x, 140))}
  • " for x in outliers[:6]) if isinstance(outliers, list) else "" conclusion = str(parsed.get("conclusion", "")).strip() conclusion_html = f"

    Conclusion

    {_escape(conclusion)}

    " if conclusion else "" return ( "
    " "

    Consensus / Count (Portfolio Statistics)

    " "
    " "" "" f"" f"" f"" f"" "
    Count ItemValue
    explicit{_escape(counts.get('explicit', 0))} ({_escape(percentages.get('explicit', 'N/A'))}%)
    partial{_escape(counts.get('partial', 0))} ({_escape(percentages.get('partial', 'N/A'))}%)
    missing{_escape(counts.get('missing', 0))} ({_escape(percentages.get('missing', 'N/A'))}%)
    total{_escape(counts.get('total', len(per_report)))}
    " "
    " "" f"{report_rows_html}" "
    ReportLabelKey Evidence
    " f"{matrix_html}" "
    " f"

    Consensus Items

      {consensus_html or '
    • None
    • '}
    " f"

    Outliers

      {outliers_html or '
    • None
    • '}
    " "
    " f"{conclusion_html}" "
    " ) def _coerce_payload_for_ui(payload): if isinstance(payload, list): if payload and all(isinstance(x, dict) and str(x.get("report", "")).strip() for x in payload): return { "skill": "Comparative Table Builder", "reports": payload, } return payload if not isinstance(payload, dict): return payload if isinstance(payload.get("reports"), list) or isinstance(payload.get("maturity_comparison"), dict): return payload is_single_report_record = ( str(payload.get("report", "")).strip() != "" and any(k in payload for k in ("maturity_level", "key_evidence", "quant_metrics", "comparison_metrics", "year")) and not any(k in payload for k in ("answer", "explanation", "evidence_ids", "rows", "ranking")) ) if not is_single_report_record: return payload report_item = { "report": payload.get("report", "Unknown"), "year": payload.get("year", None), "maturity_level": payload.get("maturity_level", "unknown"), "key_evidence": payload.get("key_evidence", []) if isinstance(payload.get("key_evidence"), list) else [], "quant_metrics": payload.get("quant_metrics", []) if isinstance(payload.get("quant_metrics"), list) else [], } normalized = { "skill": str(payload.get("skill", "")).strip() or "Comparative Table Builder", "reports": [report_item], } if "dimension" in payload: normalized["dimension"] = payload.get("dimension") if isinstance(payload.get("comparison_metrics"), list): normalized["comparison_metrics"] = payload.get("comparison_metrics") if "conclusion" in payload: normalized["conclusion"] = payload.get("conclusion") if "confidence_distribution" in payload: normalized["confidence_distribution"] = payload.get("confidence_distribution") return normalized def _render_skill_specific_html(parsed: dict) -> str: legacy = _render_maturity_comparison_html(parsed) if legacy: return _attach_confidence_block(legacy, parsed) skill = _normalize_skill_for_render(parsed) html = "" if skill == "Trend & Quant Comparator": html = _render_trend_quant_comparator_html(parsed) if skill == "Target Attainment & Delta Benchmark": html = _render_target_attainment_delta_html(parsed) if skill == "Comparative Table Builder": html = _render_comparative_table_builder_html(parsed) if skill == "Compliance Checklist": html = _render_compliance_checklist_html(parsed) if skill == "Dimension Extractor": html = _render_dimension_extractor_html(parsed) if skill == "Contradiction/Consistency Check": html = _render_consistency_check_html(parsed) if skill == "Consensus/Count (Portfolio Statistics)": html = _render_consensus_count_html(parsed) if not html: return "" return _attach_confidence_block(html, parsed) def _extract_json_payload(text: str): """Extract a JSON value from mixed model output text.""" if not text: return None decoder = json.JSONDecoder() # 1) Whole string is JSON. try: return json.loads(text) except Exception: pass # 2) JSON fenced blocks. for block in re.findall(r"```(?:json)?\s*([\s\S]*?)```", text, flags=re.IGNORECASE): try: return json.loads(block.strip()) except Exception: continue # 3) Marker-based extraction. marker = "Final Answer in JSON:" if marker in text: tail = text.split(marker, 1)[1].strip() if tail: try: obj, _ = decoder.raw_decode(tail) return obj except Exception: pass # 4) Scan every '{' and try raw_decode on that suffix. candidates = [] for i, ch in enumerate(text): if ch != "{": continue try: obj, end = decoder.raw_decode(text[i:]) consumed = text[i:i + end] candidates.append((obj, len(consumed))) except Exception: continue if not candidates: return None # Prefer dicts with known answer schemas, else the largest parsed candidate. def _score(item): obj, consumed_len = item schema_bonus = 0 if isinstance(obj, dict): if any(k in obj for k in ("dimension", "rows", "ranking")): schema_bonus += 10 if any(k in obj for k in ("answer", "explanation", "evidence_ids")): schema_bonus += 10 if "maturity_comparison" in obj: schema_bonus += 15 if any(k in obj for k in ("reports", "counts", "checks", "bucket_counts", "per_report")): schema_bonus += 12 if "skill" in obj: schema_bonus += 8 if "confidence_distribution" in obj: schema_bonus += 6 return (schema_bonus, consumed_len) candidates.sort(key=_score, reverse=True) return candidates[0][0] def _format_answer(answer: str) -> str: if not answer: return "" parsed = _coerce_payload_for_ui(_extract_json_payload(answer)) if isinstance(parsed, dict): low_keys = {str(k).strip().lower() for k in parsed.keys()} if low_keys and low_keys.issubset({"high", "medium", "low"}): conf = _normalize_confidence_distribution(parsed) if conf: cleaned = re.sub( r"confidence\s*distribution\s*:\s*\{[^{}]+\}", "", str(answer), flags=re.IGNORECASE, ).strip() body = _escape(cleaned).replace("\n", "
    ") return f"
    {body}
    {_render_confidence_distribution(conf)}" if parsed is None: conf, cleaned = _extract_confidence_distribution_from_text(answer) conf_html = _render_confidence_distribution(conf) if conf_html: body = _escape(cleaned).replace("\n", "
    ") return f"
    {body}
    {conf_html}" return answer if not isinstance(parsed, dict): return f"```json\n{json.dumps(parsed, ensure_ascii=False, indent=2)}\n```" skill_html = _render_skill_specific_html(parsed) if skill_html: return skill_html parts = [] dimension = parsed.get("dimension") if dimension: parts.append(f"**Dimension:** {_md_cell(dimension)}") rows = parsed.get("rows") if isinstance(rows, list) and rows: table_rows = [] for item in rows: if not isinstance(item, dict): continue table_rows.append({ "Report": item.get("report", ""), "Year": item.get("year", ""), "Status": item.get("disclosure_status", ""), "Key Points": len(item.get("key_points") or []), "Evidence": len(item.get("evidence_chunks") or []), }) if table_rows: parts.append("### Comparison") parts.append(_md_table(table_rows, ["Report", "Year", "Status", "Key Points", "Evidence"])) ranking = parsed.get("ranking") if isinstance(ranking, list) and ranking: ranking_rows = [] for item in ranking: if not isinstance(item, dict): continue ranking_rows.append({ "Rank": item.get("rank", ""), "Report": item.get("report", ""), "Rationale": _truncate(item.get("rationale", "")), }) if ranking_rows: parts.append("### Ranking") parts.append(_md_table(ranking_rows, ["Rank", "Report", "Rationale"])) conclusion = parsed.get("conclusion") if isinstance(conclusion, str) and conclusion.strip(): parts.append("### Conclusion") parts.append(_md_cell(conclusion)) # Generic JSON-answer schema fallback. if "answer" in parsed or "explanation" in parsed or "evidence_ids" in parsed: ans = parsed.get("answer", "") exp = parsed.get("explanation", "") if ans: parts.append("### Answer") parts.append(_md_cell(ans)) if exp: parts.append("### Explanation") parts.append(_md_cell(exp)) ev_ids = parsed.get("evidence_ids") if isinstance(ev_ids, list) and ev_ids: parts.append(f"### Retrieved Sources Count\n{len(ev_ids)}") # Show remaining scalar fields in a compact table. skip_keys = {"answer", "explanation", "evidence_ids", "dimension", "rows", "ranking"} extra_rows = [] for k, v in parsed.items(): if k in skip_keys: continue if isinstance(v, (str, int, float, bool)) or v is None: extra_rows.append({"Field": k, "Value": _md_cell(v)}) if extra_rows: parts.append("### Extra Fields") parts.append(_md_table(extra_rows, ["Field", "Value"])) conf_html = _render_confidence_distribution(parsed.get("confidence_distribution")) if conf_html: parts.append(conf_html) if parts: return "\n\n".join(parts) return f"```json\n{json.dumps(parsed, ensure_ascii=False, indent=2)}\n```" # ======================== Handlers ======================== def on_doc_mode_change(doc_mode): if doc_mode == "Single-document": return ( gr.update(placeholder=PLACEHOLDER_SINGLE, value=""), gr.update( value=( '

    ' '\U0001f4a1 Tip: We recommend prefixing your question with the report name, ' 'e.g. "For [Report Name], does the company ...?"' '

    ' ), visible=True, ), ) return ( gr.update(placeholder=PLACEHOLDER_MULTI, value=""), gr.update( value=( '

    ' '\U0001f4a1 Tip: We recommend prefixing your question with the report name, ' 'e.g. "For [Report 1 Name] and [Report 2 Name], does ...?"' '

    ' ), visible=True, ), ) def on_model_selection_change(gen_model, embed_model): use_api_gen = "(API)" in str(gen_model) use_gemini_gen = use_api_gen and _is_gemini_generation_model(gen_model) needs_openai_key = (str(embed_model) in OPENAI_EMBED_MODELS_SET) or (use_api_gen and not use_gemini_gen) needs_gemini_key = use_gemini_gen return ( gr.update(visible=needs_openai_key), gr.update(visible=needs_gemini_key), ) def on_report_select(report_name): if not report_name: return ( "

    No report selected.

    ", 1, 1, "Page: 1 / 1", gr.update(interactive=False), gr.update(interactive=False), ) total = _get_pdf_total_pages(report_name) return ( _pdf_iframe(report_name, page=1), 1, total, f"Page: 1 / {total}", gr.update(interactive=False), gr.update(interactive=total > 1), ) def on_prev_page(report_name, current_page, total_pages): if not report_name: return ( "

    No report selected.

    ", 1, 1, "Page: 1 / 1", gr.update(interactive=False), gr.update(interactive=False), ) total = max(1, int(total_pages or 1)) page = max(1, int(current_page or 1) - 1) return ( _pdf_iframe(report_name, page=page), page, total, f"Page: {page} / {total}", gr.update(interactive=page > 1), gr.update(interactive=page < total), ) def on_next_page(report_name, current_page, total_pages): if not report_name: return ( "

    No report selected.

    ", 1, 1, "Page: 1 / 1", gr.update(interactive=False), gr.update(interactive=False), ) total = max(1, int(total_pages or 1)) page = min(total, max(1, int(current_page or 1) + 1)) return ( _pdf_iframe(report_name, page=page), page, total, f"Page: {page} / {total}", gr.update(interactive=page > 1), gr.update(interactive=page < total), ) def on_run_start(): return "## Waiting......", "", _render_waiting("Waiting......") def _has_openai_api_key(local_api_key: str) -> bool: if str(local_api_key or "").strip(): return True if os.getenv("OPENAI_API_KEY", "").strip(): return True if os.getenv("OPENAI_API_KEY_88996", "").strip(): return True return False def _has_gemini_api_key(local_api_key: str) -> bool: if str(local_api_key or "").strip(): return True if os.getenv("GEMINI_API_KEY", "").strip(): return True return False def do_query(question, doc_mode_label, rag_mode, embed_model, gen_model, openai_api_key, gemini_api_key, top_k): empty_btns = _default_claim_button_updates() empty_state_contexts = [] empty_state_trace = [] openai_key = str(openai_api_key or "").strip() gemini_key = str(gemini_api_key or "").strip() if not question or not question.strip(): yield "\u26a0\ufe0f Please enter a question.", "", "", "", "", empty_state_contexts, empty_state_trace, *empty_btns return if (not HAS_GPU) and ("(API)" not in str(gen_model)): msg = "\u26a0\ufe0f No GPU detected. Please use an API generation model." yield msg, "", msg, "", "", empty_state_contexts, empty_state_trace, *empty_btns return if (str(embed_model) in OPENAI_EMBED_MODELS_SET) and (not _has_openai_api_key(openai_key)): msg = ( "\u26a0\ufe0f OpenAI embedding model selected but API key is missing. " "Please input API key or set OPENAI_API_KEY." ) yield msg, "", msg, "", "", empty_state_contexts, empty_state_trace, *empty_btns return if "(API)" in str(gen_model): if _is_gemini_generation_model(gen_model): if not _has_gemini_api_key(gemini_key): msg = ( "\u26a0\ufe0f Gemini API generation model selected but API key is missing. " "Please input API key or set GEMINI_API_KEY." ) yield msg, "", msg, "", "", empty_state_contexts, empty_state_trace, *empty_btns return elif not _has_openai_api_key(openai_key): msg = ( "\u26a0\ufe0f OpenAI API generation model selected but API key is missing. " "Please input API key or set OPENAI_API_KEY." ) yield msg, "", msg, "", "", empty_state_contexts, empty_state_trace, *empty_btns return if openai_key: os.environ["OPENAI_API_KEY"] = openai_key if gemini_key: os.environ["GEMINI_API_KEY"] = gemini_key backend_api_key = openai_key if (not backend_api_key) and _is_gemini_generation_model(gen_model): backend_api_key = gemini_key doc_mode = "single" if doc_mode_label == "Single-document" else "multi" rag_mode = str(rag_mode or "ClimateRAG") q = question.strip() try: base_top_k = max(1, int(top_k)) except Exception: base_top_k = 5 t0 = time.perf_counter() if rag_mode != "ClimateRAG": answer, contexts = run_rag( question=q, chunk_mode="length", doc_mode=doc_mode, top_k=base_top_k, embed_name=embed_model, gen_model=gen_model, api_key=backend_api_key, ) elapsed = time.perf_counter() - t0 answer_md = _format_answer(answer) evidence_md = _format_evidence(contexts) status = f"\u2705 Baseline RAG complete: retrieved {len(contexts)} passages." timing_md = f"\u23f1\ufe0f **Elapsed:** `{elapsed:.2f}s`" pipeline_md = ( "## Baseline RAG\n" f"- Retrieved **{len(contexts)}** passages\n" "- Single-step retrieval + generation completed." ) yield answer_md, evidence_md, status, timing_md, pipeline_md, contexts, [], *empty_btns return answer_md = "*Waiting for STEP 2 answer...*" evidence_md = "*Waiting for retrieval...*" status = "⏳ ClimateRAG pipeline started." timing_md = "" pipeline_md = _render_waiting("STEP 1 Waiting......") yield answer_md, evidence_md, status, timing_md, pipeline_md, empty_state_contexts, empty_state_trace, *empty_btns try: # ---------- Step 1: retrieval + clustering ---------- step1 = run_trustworthy_step1( question=q, doc_mode=doc_mode, top_k=base_top_k, embed_name=embed_model, gen_model=gen_model, api_key=backend_api_key, ) step1_md = _render_step1_clusters_md(step1) evidence_md = _format_evidence(step1.get("contexts", [])) status = "⏳ STEP 1 completed. Running STEP 2..." pipeline_md = step1_md + "\n\n---\n\n" + _render_waiting("STEP 2 Waiting......") yield answer_md, evidence_md, status, "", pipeline_md, step1.get("contexts", []), empty_state_trace, *empty_btns # ---------- Step 2: answer generation ---------- step2 = run_trustworthy_step2( question=q, doc_mode=doc_mode, contexts=step1.get("contexts", []), clusters=step1.get("clusters", []), gen_model=gen_model, api_key=backend_api_key, ) answer_md = _format_answer(step2.get("answer", "")) step2_md = _render_step2_claims_md(step2) status = "⏳ STEP 2 completed. Running STEP 3..." pipeline_md = step1_md + "\n\n---\n\n" + step2_md + "\n\n---\n\n" + _render_waiting("STEP 3 Waiting......") yield answer_md, evidence_md, status, "", pipeline_md, step1.get("contexts", []), empty_state_trace, *empty_btns # ---------- Step 3: claim extractor ---------- step3 = run_trustworthy_step3_claims( question=q, answer=step2.get("answer", ""), contexts=step1.get("contexts", []), doc_mode=doc_mode, gen_model=gen_model, api_key=backend_api_key, ) step3_md = _render_step3_md(step3) step2_summary_md = _render_step2_summary_md(step2) final_trace = _prepare_claim_trace(step3) btn_updates = _claim_button_updates(final_trace) elapsed = time.perf_counter() - t0 status = ( "\u2705 ClimateRAG pipeline completed: " f"{len(step1.get('contexts', []))} passages, {len(step3.get('claims', []))} claims extracted." ) timing_md = f"\u23f1\ufe0f **Elapsed:** `{elapsed:.2f}s`" pipeline_md = step1_md + "\n\n---\n\n" + step2_summary_md + "\n\n---\n\n" + step3_md yield answer_md, evidence_md, status, timing_md, pipeline_md, step1.get("contexts", []), final_trace, *btn_updates return except Exception as e: elapsed = time.perf_counter() - t0 err = f"\u26a0\ufe0f ClimateRAG pipeline failed: {e}" timing_md = f"\u23f1\ufe0f **Elapsed before failure:** `{elapsed:.2f}s`" yield "*ClimateRAG pipeline failed.*", "", err, timing_md, f"{_render_waiting('Waiting......')}\n\n{err}", empty_state_contexts, empty_state_trace, *empty_btns return def build_report_name_list(): """Build report name list without requiring local PDF files.""" reports = list_reports() names = sorted({str(r.get("name", "")).strip() for r in reports if isinstance(r, dict) and str(r.get("name", "")).strip()}) if names: return names # Fallback to chunk JSON source names when Reports/ is removed. try: chunks = get_report_chunks("structure") names = sorted([str(x).strip() for x in chunks.keys() if str(x).strip()]) if names: return names except Exception: pass try: chunks = get_report_chunks("length") names = sorted([str(x).strip() for x in chunks.keys() if str(x).strip()]) if names: return names except Exception: pass return [] def render_report_names_md(names): if not names: return "_No report names found from local PDFs or chunk JSON sources._" lines = [f"### Report Names ({len(names)})", ""] lines.extend([f"- `{n}`" for n in names]) return "\n".join(lines) # ======================== CSS ======================== CUSTOM_CSS = """ :root { --font: "Segoe UI", Roboto, Helvetica, Arial, sans-serif !important; } html, body, button, input, textarea, select { font-family: "Segoe UI", Roboto, Helvetica, Arial, sans-serif !important; -webkit-font-smoothing: antialiased; -moz-osx-font-smoothing: grayscale; } /* Hide image toolbar buttons */ .gradio-image button, .gradio-image .absolute { display: none !important; } footer { display: none !important; } .built-with { display: none !important; } .hint-text { color: #666; font-size: 0.95em; margin-top: 2px; margin-bottom: 8px; width: 100%; max-width: none; line-height: 1.45; white-space: normal; } .logo-header { display: flex; align-items: center; justify-content: center; padding: 20px 16px 12px 16px; background: linear-gradient(180deg, #f0f7ff 0%, #ffffff 100%); border-radius: 12px; margin-bottom: 8px; min-height: 124px; } .logo-header-text { text-align: center; width: 100%; max-width: 900px; } .logo-header h2 { margin: 0 0 2px 0; color: #1a5276; font-size: 1.45em; letter-spacing: 0.02em; } .logo-header p { color: #666; font-size: 0.95em; margin: 0; } .mock-banner { background: #fff3cd; border: 1px solid #ffc107; border-radius: 8px; padding: 12px 20px; margin: 0 0 12px 0; color: #856404; font-size: 0.92em; } .custom-footer { text-align: center; padding: 14px 0; color: #bbb; font-size: 0.83em; border-top: 1px solid #eee; margin-top: 20px; } .waiting-banner { font-size: 2rem; font-weight: 700; color: #d35400; text-align: center; } .maturity-card { border: 1px solid #dbe7f3; border-radius: 12px; padding: 14px; background: linear-gradient(180deg, #f8fbff 0%, #ffffff 100%); } .maturity-card h3 { margin: 0 0 12px 0; color: #154360; } .maturity-card h4 { margin: 10px 0 6px 0; color: #1b4f72; } .maturity-table-wrap { overflow-x: auto; } .maturity-table { width: 100%; border-collapse: collapse; font-size: 0.95rem; } .maturity-table th, .maturity-table td { border: 1px solid #d6e4f0; padding: 10px; vertical-align: top; text-align: left; line-height: 1.45; } .maturity-table thead th { background: #eaf3ff; } .maturity-table tbody tr:nth-child(even) { background: #fbfdff; } .maturity-badge { display: inline-block; padding: 2px 10px; border-radius: 999px; font-weight: 600; font-size: 0.86rem; } .maturity-badge.level-high { background: #e8f8f0; color: #117864; } .maturity-badge.level-moderate { background: #fff4e5; color: #9c640c; } .maturity-badge.level-low { background: #fdecea; color: #922b21; } .maturity-badge.level-insufficient { background: #fdecea; color: #922b21; } .maturity-badge.level-unknown { background: #eef2f7; color: #34495e; } .maturity-list { margin: 0; padding-left: 18px; } .maturity-list li { margin: 0 0 6px 0; } .muted { color: #9aa5b1; } .maturity-conclusion { margin-top: 12px; border-top: 1px dashed #cad9e8; padding-top: 10px; } .maturity-conclusion h4 { margin: 0 0 6px 0; color: #1b4f72; } .maturity-conclusion p { margin: 0; } .maturity-note { margin-top: 10px; padding: 8px 10px; border: 1px dashed #cad9e8; border-radius: 8px; color: #516274; background: #f8fbff; } .metric-chip-wrap { margin-top: 10px; } .metric-chip { display: inline-block; margin: 4px 6px 0 0; padding: 4px 10px; border-radius: 999px; border: 1px solid #c8ddf2; background: #edf5ff; color: #1b4f72; font-size: 0.84rem; } .metric-chip.trend-up { background: #e8f8f0; border-color: #bfe8d3; color: #117864; } .metric-chip.trend-down { background: #fdecea; border-color: #f3c6c2; color: #922b21; } .metric-chip.trend-flat { background: #fff4e5; border-color: #f2ddba; color: #9c640c; } .metric-chip.trend-unknown { background: #eef2f7; border-color: #d6dce3; color: #5d6d7e; } .maturity-split { display: grid; grid-template-columns: 1fr 1fr; gap: 12px; margin-top: 10px; } .confidence-card { margin-top: 12px; border: 1px solid #dbe7f3; border-radius: 10px; padding: 10px 12px; background: #f8fbff; } .confidence-card h4 { margin: 0 0 8px 0; color: #1b4f72; } .conf-row { display: grid; grid-template-columns: 70px 1fr 60px; gap: 8px; align-items: center; margin: 6px 0; } .conf-label { font-weight: 600; color: #34495e; } .conf-bar { height: 10px; border-radius: 999px; background: #e9eff6; overflow: hidden; } .conf-fill { height: 100%; border-radius: 999px; } .conf-fill.conf-high { background: #27ae60; } .conf-fill.conf-medium { background: #f39c12; } .conf-fill.conf-low { background: #e74c3c; } .conf-value { text-align: right; color: #566573; font-variant-numeric: tabular-nums; } @media (max-width: 900px) { .logo-header { min-height: auto; padding-top: 12px; padding-bottom: 12px; flex-direction: column; } .maturity-split { grid-template-columns: 1fr; } } """ # ======================== Gradio UI ======================== with gr.Blocks( title="Climate Disclosure RAG" ) as demo: # ---------- Header ---------- gr.HTML( f"""

    Climate Disclosure RAG

    AI-powered analysis of corporate sustainability & climate disclosures

    """ ) # ==================== Tab 1: Question Answering ==================== with gr.Tab("\U0001f50d Question Answering"): gr.Markdown("### \U0001f4ac Ask a Question About Sustainability Reports") with gr.Row(): doc_mode_radio = gr.Radio( choices=["Multi-document", "Single-document"], value="Multi-document", label="Question Type", info="Single: ask about one report | Multi: compare across reports", ) single_hint = gr.Markdown( '

    ' '\U0001f4a1 Tip: We recommend prefixing your question with the report name, ' 'e.g. "For [Report 1 Name] and [Report 2 Name], does ...?"' '

    ', visible=True, ) question_box = gr.Textbox( label="Your Question", placeholder=PLACEHOLDER_MULTI, lines=3, max_lines=6, info='Please click "Use Example Question" to use the recommended question.', ) use_example_btn = gr.Button("Use Example Question", variant="primary") gr.Markdown("#### \u2699\ufe0f Model Configuration") with gr.Row(): with gr.Column(scale=1): rag_mode_dd = gr.Dropdown( choices=["ClimateRAG", "Baseline RAG"], value="ClimateRAG", label="RAG Mode", ) with gr.Column(scale=1): embed_model_dd = gr.Dropdown( choices=EMBED_MODELS, value=EMBED_MODELS[0], label="\U0001f9e0 Embedding Model", ) with gr.Column(scale=1): gen_model_dd = gr.Dropdown( choices=GEN_MODELS, value=(GEN_MODELS[0] if HAS_GPU else API_GEN_MODEL), label="\U0001f916 Generation Model", ) if not HAS_GPU: gr.Markdown( "GPU not detected: local generation models are disabled. " "Only API generation models are available." ) gr.Markdown( "Disabled (GPU-only): " + ", ".join(GPU_GEN_MODELS) + "" ) default_gen_model = GEN_MODELS[0] if HAS_GPU else API_GEN_MODEL default_embed_model = EMBED_MODELS[0] default_need_openai_key = ( (default_embed_model in OPENAI_EMBED_MODELS_SET) or (("(API)" in str(default_gen_model)) and (not _is_gemini_generation_model(default_gen_model))) ) default_need_gemini_key = ("(API)" in str(default_gen_model)) and _is_gemini_generation_model(default_gen_model) openai_api_key_box = gr.Textbox( label="\U0001f511 OpenAI API Key", type="password", placeholder="sk-...", value=DEFAULT_OPENAI_API_KEY, visible=default_need_openai_key, info="Required for OpenAI embedding models and OpenAI API generation models.", ) gemini_api_key_box = gr.Textbox( label="\U0001f511 Gemini API Key", type="password", placeholder="AIza...", value=DEFAULT_GEMINI_API_KEY, visible=default_need_gemini_key, info="Required for Gemini API generation models.", ) top_k_slider = gr.Slider( minimum=1, maximum=20, value=5, step=1, label="\U0001f3af Top-K Retrieved Passages", ) submit_btn = gr.Button("\U0001f680 Run Analysis", variant="primary", size="lg") status_md = gr.Markdown("") timing_md = gr.Markdown("") with gr.Row(): with gr.Column(scale=1): gr.Markdown("#### ClimateRAG Pipeline") pipeline_md = gr.Markdown( value="*Three-step ClimateRAG pipeline output will appear here after Run.*", sanitize_html=False, ) gr.Markdown("#### Generated Answer") answer_box = gr.Markdown( value="*Answer will appear here after you click Run.*", sanitize_html=False, ) gr.Markdown("#### Claim Trace (Click to Highlight Evidence)") with gr.Row(): claim_btn_1 = gr.Button("Claim 1", visible=False) claim_btn_2 = gr.Button("Claim 2", visible=False) claim_btn_3 = gr.Button("Claim 3", visible=False) clear_highlight_btn = gr.Button("Clear Highlight", visible=True) with gr.Column(scale=1): gr.Markdown("#### Retrieved Evidence") evidence_box = gr.Markdown( value="*Results will appear here after you click Run.*", sanitize_html=False, ) contexts_state = gr.State([]) claim_trace_state = gr.State([]) # ---- Wiring ---- doc_mode_radio.change( fn=on_doc_mode_change, inputs=[doc_mode_radio], outputs=[question_box, single_hint], ) use_example_btn.click( fn=lambda mode: PLACEHOLDER_SINGLE if mode == "Single-document" else PLACEHOLDER_MULTI, inputs=[doc_mode_radio], outputs=[question_box], ) gen_model_dd.change( fn=on_model_selection_change, inputs=[gen_model_dd, embed_model_dd], outputs=[openai_api_key_box, gemini_api_key_box], queue=False, ) embed_model_dd.change( fn=on_model_selection_change, inputs=[gen_model_dd, embed_model_dd], outputs=[openai_api_key_box, gemini_api_key_box], queue=False, ) demo.load( fn=on_model_selection_change, inputs=[gen_model_dd, embed_model_dd], outputs=[openai_api_key_box, gemini_api_key_box], queue=False, ) submit_btn.click( fn=on_run_start, outputs=[status_md, timing_md, pipeline_md], queue=False, ).then( fn=do_query, inputs=[ question_box, doc_mode_radio, rag_mode_dd, embed_model_dd, gen_model_dd, openai_api_key_box, gemini_api_key_box, top_k_slider, ], outputs=[ answer_box, evidence_box, status_md, timing_md, pipeline_md, contexts_state, claim_trace_state, claim_btn_1, claim_btn_2, claim_btn_3, ], ) claim_btn_1.click( fn=lambda ctx, trace: on_claim_click(0, ctx, trace), inputs=[contexts_state, claim_trace_state], outputs=[evidence_box], queue=False, ) claim_btn_2.click( fn=lambda ctx, trace: on_claim_click(1, ctx, trace), inputs=[contexts_state, claim_trace_state], outputs=[evidence_box], queue=False, ) claim_btn_3.click( fn=lambda ctx, trace: on_claim_click(2, ctx, trace), inputs=[contexts_state, claim_trace_state], outputs=[evidence_box], queue=False, ) clear_highlight_btn.click( fn=clear_claim_highlight, inputs=[contexts_state], outputs=[evidence_box], queue=False, ) # ---- Tab 2: Document Library ---- with gr.Tab("\U0001f4da Document Library"): gr.Markdown( "### Sustainability Report Collection\n" "Direct PDF download is disabled in this Space. " "Use the official GitHub link to access report files." ) gr.Markdown(f"Report download link: [ClimRetrieve Reports]({REPORTS_GITHUB_URL})") report_names = build_report_name_list() gr.Markdown(render_report_names_md(report_names)) # ==================== Tab 3: About ==================== with gr.Tab("ℹ️ About"): gr.Markdown(""" ### ClimateRAG — Climate Disclosure Retrieval-Augmented Generation for Evidence-based Question-Answering Increasingly stringent global regulations require companies to provide detailed and auditable climate-related disclosures. These reports are often lengthy and visually complex, making manual analysis challenging for regulators and auditors who require precise evidence grounding rather than free-form answers. ClimateRAG is a structured processing and reasoning framework designed for automated climate disclosure analysis. The system integrates hierarchical document chunking, an agent-based reasoning pipeline, and a claim extractor module to produce traceable, evidence-linked, and auditable outputs. It supports both single-document and multi-document analysis scenarios. We additionally introduce a dataset of 367 expert-annotated question–answer pairs covering realistic regulatory and audit workflows. Experimental evaluation demonstrates the effectiveness and efficiency of the proposed framework for climate disclosure analysis. The goal of ClimateRAG is to bridge Large Language Models with the rigorous standards required in regulatory auditing and sustainability reporting. --- ### Key Contributions 1. We develop ClimateRAG, the first system specifically designed for auditable and evidence-linked climate disclosure analysis with multi-document reasoning capability. 2. We construct a dataset of 367 annotated QA pairs spanning single-document and cross-document settings, aligned with real-world regulatory and auditing scenarios. 3. We conduct systematic evaluation to assess both retrieval and generation performance, validating the robustness and practical utility of the system. --- ### Project Website https://cheng-tf.github.io/ClimateRAG/ """) # ---------- Custom Footer ---------- gr.HTML( '' ) # ======================== Launch ======================== if __name__ == "__main__": server_name = os.getenv("APP_HOST", "0.0.0.0") server_port = int(os.getenv("APP_PORT", "7860")) root_path = os.getenv("APP_ROOT_PATH", "") share = os.getenv("APP_SHARE", "false").lower() in {"1", "true", "yes", "y"} allowed_paths = [ p for p in [REPORTS_DIR, SCRIPT_DIR] if isinstance(p, str) and os.path.exists(p) ] launch_kwargs = dict( server_name=server_name, server_port=server_port, share=share, show_error=True, root_path=root_path if root_path else None, css=CUSTOM_CSS, theme=gr.themes.Soft(primary_hue="blue", secondary_hue="slate"), ) if allowed_paths: launch_kwargs["allowed_paths"] = allowed_paths demo.launch(**launch_kwargs)