File size: 6,223 Bytes
6347098
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
# models/bias/bias_utils.py

"""

Bias Detection Utilities for Penny



Provides zero-shot classification for detecting potential bias in text responses.

Uses a classification model to identify neutral content vs. biased language patterns.

"""

import asyncio
from typing import Dict, Any, Optional, List
import logging

# --- Logging Setup ---
logger = logging.getLogger(__name__)

# --- Model Loader Import ---
try:
    from app.model_loader import load_model_pipeline
    MODEL_LOADER_AVAILABLE = True
except ImportError:
    MODEL_LOADER_AVAILABLE = False
    logger.warning("Could not import load_model_pipeline. Bias detection will operate in fallback mode.")

# Global variable to store the loaded pipeline for re-use
BIAS_PIPELINE: Optional[Any] = None
AGENT_NAME = "penny-bias-checker"

# Define the labels for Zero-Shot Classification.
CANDIDATE_LABELS = [
    "neutral and objective",
    "contains political bias",
    "uses emotional language",
    "is factually biased",
]


def _initialize_bias_pipeline() -> bool:
    """

    Initializes the bias detection pipeline only once.

    

    Returns:

        bool: True if pipeline loaded successfully, False otherwise

    """
    global BIAS_PIPELINE
    
    if BIAS_PIPELINE is not None:
        return True
    
    if not MODEL_LOADER_AVAILABLE:
        logger.warning(f"{AGENT_NAME}: Model loader not available, pipeline initialization skipped")
        return False
    
    try:
        logger.info(f"Loading {AGENT_NAME}...")
        BIAS_PIPELINE = load_model_pipeline(AGENT_NAME)
        logger.info(f"Model {AGENT_NAME} loaded successfully")
        return True
    except Exception as e:
        logger.error(f"Failed to load {AGENT_NAME}: {e}", exc_info=True)
        BIAS_PIPELINE = None
        return False


# Attempt to initialize pipeline at module load
_initialize_bias_pipeline()


async def check_bias(text: str) -> Dict[str, Any]:
    """

    Runs zero-shot classification to check for bias in the input text.

    

    Uses a pre-loaded classification model to analyze text for:

    - Neutral and objective language

    - Political bias

    - Emotional language

    - Factual bias

    

    Args:

        text: The string of text to analyze for bias

        

    Returns:

        Dictionary containing:

            - analysis: List of labels with confidence scores, sorted by score

            - available: Whether the bias detection service is operational

            - message: Optional error or status message

            

    Example:

        >>> result = await check_bias("This is neutral text.")

        >>> result['analysis'][0]['label']

        'neutral and objective'

    """
    global BIAS_PIPELINE
    
    # Input validation
    if not text or not isinstance(text, str):
        logger.warning("check_bias called with invalid text input")
        return {
            "analysis": [],
            "available": False,
            "message": "Invalid input: text must be a non-empty string"
        }
    
    # Strip text to avoid processing whitespace
    text = text.strip()
    if not text:
        logger.warning("check_bias called with empty text after stripping")
        return {
            "analysis": [],
            "available": False,
            "message": "Invalid input: text is empty"
        }
    
    # Ensure pipeline is initialized
    if BIAS_PIPELINE is None:
        logger.warning(f"{AGENT_NAME} pipeline not available, attempting re-initialization")
        if not _initialize_bias_pipeline():
            return {
                "analysis": [],
                "available": False,
                "message": "Bias detection service is currently unavailable"
            }
    
    try:
        loop = asyncio.get_event_loop()
        
        # Run inference in thread pool to avoid blocking
        results = await loop.run_in_executor(
            None,
            lambda: BIAS_PIPELINE(
                text,
                CANDIDATE_LABELS,
                multi_label=True
            )
        )
        
        # Validate results structure
        if not results or not isinstance(results, dict):
            logger.error(f"Bias detection returned unexpected format: {type(results)}")
            return {
                "analysis": [],
                "available": True,
                "message": "Inference returned unexpected format"
            }
        
        labels = results.get('labels', [])
        scores = results.get('scores', [])
        
        if not labels or not scores:
            logger.warning("Bias detection returned empty labels or scores")
            return {
                "analysis": [],
                "available": True,
                "message": "No classification results returned"
            }
        
        # Build analysis results
        analysis = [
            {"label": label, "score": float(score)}
            for label, score in zip(labels, scores)
        ]
        
        # Sort by confidence score (descending)
        analysis.sort(key=lambda x: x['score'], reverse=True)
        
        logger.debug(f"Bias check completed successfully, top result: {analysis[0]['label']} ({analysis[0]['score']:.3f})")
        
        return {
            "analysis": analysis,
            "available": True
        }
    
    except asyncio.CancelledError:
        logger.warning("Bias detection task was cancelled")
        raise
    
    except Exception as e:
        logger.error(f"Error during bias detection inference: {e}", exc_info=True)
        return {
            "analysis": [],
            "available": False,
            "message": f"Bias detection error: {str(e)}"
        }


def get_bias_pipeline_status() -> Dict[str, Any]:
    """

    Returns the current status of the bias detection pipeline.

    

    Returns:

        Dictionary with pipeline availability status

    """
    return {
        "agent_name": AGENT_NAME,
        "available": BIAS_PIPELINE is not None,
        "model_loader_available": MODEL_LOADER_AVAILABLE
    }