File size: 2,611 Bytes
345000b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import json
import re
from typing import List
from pydantic import BaseModel


# =========================
# Pydantic Model
# =========================

class BiasReport(BaseModel):
    detected_biases: List[str]
    risk_level: str
    explanations: List[str]
    mitigation_suggestions: List[str]


# =========================
# JSON Normalizer
# =========================

def normalize_llm_json(raw: str) -> dict:
    """

    Extracts and parses a JSON object from noisy LLM output.

    Handles markdown fences, 'json' prefixes, and extra text.

    """

    if raw is None:
        raise ValueError("LLM returned None")

    raw = raw.strip()

    if not raw:
        raise ValueError("LLM returned empty output")

    # Remove ```json fences
    if raw.startswith("```"):
        parts = raw.split("```")
        if len(parts) >= 2:
            raw = parts[1].strip()

    # Remove leading 'json'
    if raw.lower().startswith("json"):
        raw = raw[4:].strip()

    # Extract JSON object
    match = re.search(r"\{.*\}", raw, re.DOTALL)
    if not match:
        raise ValueError(f"No JSON object found in LLM output:\n{raw}")

    json_text = match.group()

    return json.loads(json_text)


# =========================
# Bias Output Parser
# =========================

def parse_bias_output(raw: str) -> BiasReport:
    """

    Normalizes and parses LLM bias analysis output into BiasReport.

    Safely handles dict/list variations from the LLM.

    """

    data = normalize_llm_json(raw)

    # ---------- findings_explanation ----------
    findings = data.get("findings_explanation", [])

    if isinstance(findings, dict):
        detected_biases = list(findings.keys())
        explanations = list(findings.values())
    elif isinstance(findings, list):
        detected_biases = []
        explanations = findings
    else:
        detected_biases = []
        explanations = []

    # ---------- mitigation_steps ----------
    mitigation = data.get("mitigation_steps", [])

    if isinstance(mitigation, dict):
        mitigation_suggestions = list(mitigation.values())
    elif isinstance(mitigation, list):
        mitigation_suggestions = mitigation
    else:
        mitigation_suggestions = []

    # ---------- risk level ----------
    risk_level = data.get("overall_risk_level", "unknown")

    return BiasReport(
        detected_biases=detected_biases,
        risk_level=risk_level,
        explanations=explanations,
        mitigation_suggestions=mitigation_suggestions,
    )