Penny_V2 / bias_utils.py
pythonprincess's picture
Upload 25 files
6347098 verified
raw
history blame
6.22 kB
# models/bias/bias_utils.py
"""
Bias Detection Utilities for Penny
Provides zero-shot classification for detecting potential bias in text responses.
Uses a classification model to identify neutral content vs. biased language patterns.
"""
import asyncio
from typing import Dict, Any, Optional, List
import logging
# --- Logging Setup ---
logger = logging.getLogger(__name__)
# --- Model Loader Import ---
try:
from app.model_loader import load_model_pipeline
MODEL_LOADER_AVAILABLE = True
except ImportError:
MODEL_LOADER_AVAILABLE = False
logger.warning("Could not import load_model_pipeline. Bias detection will operate in fallback mode.")
# Global variable to store the loaded pipeline for re-use
BIAS_PIPELINE: Optional[Any] = None
AGENT_NAME = "penny-bias-checker"
# Define the labels for Zero-Shot Classification.
CANDIDATE_LABELS = [
"neutral and objective",
"contains political bias",
"uses emotional language",
"is factually biased",
]
def _initialize_bias_pipeline() -> bool:
"""
Initializes the bias detection pipeline only once.
Returns:
bool: True if pipeline loaded successfully, False otherwise
"""
global BIAS_PIPELINE
if BIAS_PIPELINE is not None:
return True
if not MODEL_LOADER_AVAILABLE:
logger.warning(f"{AGENT_NAME}: Model loader not available, pipeline initialization skipped")
return False
try:
logger.info(f"Loading {AGENT_NAME}...")
BIAS_PIPELINE = load_model_pipeline(AGENT_NAME)
logger.info(f"Model {AGENT_NAME} loaded successfully")
return True
except Exception as e:
logger.error(f"Failed to load {AGENT_NAME}: {e}", exc_info=True)
BIAS_PIPELINE = None
return False
# Attempt to initialize pipeline at module load
_initialize_bias_pipeline()
async def check_bias(text: str) -> Dict[str, Any]:
"""
Runs zero-shot classification to check for bias in the input text.
Uses a pre-loaded classification model to analyze text for:
- Neutral and objective language
- Political bias
- Emotional language
- Factual bias
Args:
text: The string of text to analyze for bias
Returns:
Dictionary containing:
- analysis: List of labels with confidence scores, sorted by score
- available: Whether the bias detection service is operational
- message: Optional error or status message
Example:
>>> result = await check_bias("This is neutral text.")
>>> result['analysis'][0]['label']
'neutral and objective'
"""
global BIAS_PIPELINE
# Input validation
if not text or not isinstance(text, str):
logger.warning("check_bias called with invalid text input")
return {
"analysis": [],
"available": False,
"message": "Invalid input: text must be a non-empty string"
}
# Strip text to avoid processing whitespace
text = text.strip()
if not text:
logger.warning("check_bias called with empty text after stripping")
return {
"analysis": [],
"available": False,
"message": "Invalid input: text is empty"
}
# Ensure pipeline is initialized
if BIAS_PIPELINE is None:
logger.warning(f"{AGENT_NAME} pipeline not available, attempting re-initialization")
if not _initialize_bias_pipeline():
return {
"analysis": [],
"available": False,
"message": "Bias detection service is currently unavailable"
}
try:
loop = asyncio.get_event_loop()
# Run inference in thread pool to avoid blocking
results = await loop.run_in_executor(
None,
lambda: BIAS_PIPELINE(
text,
CANDIDATE_LABELS,
multi_label=True
)
)
# Validate results structure
if not results or not isinstance(results, dict):
logger.error(f"Bias detection returned unexpected format: {type(results)}")
return {
"analysis": [],
"available": True,
"message": "Inference returned unexpected format"
}
labels = results.get('labels', [])
scores = results.get('scores', [])
if not labels or not scores:
logger.warning("Bias detection returned empty labels or scores")
return {
"analysis": [],
"available": True,
"message": "No classification results returned"
}
# Build analysis results
analysis = [
{"label": label, "score": float(score)}
for label, score in zip(labels, scores)
]
# Sort by confidence score (descending)
analysis.sort(key=lambda x: x['score'], reverse=True)
logger.debug(f"Bias check completed successfully, top result: {analysis[0]['label']} ({analysis[0]['score']:.3f})")
return {
"analysis": analysis,
"available": True
}
except asyncio.CancelledError:
logger.warning("Bias detection task was cancelled")
raise
except Exception as e:
logger.error(f"Error during bias detection inference: {e}", exc_info=True)
return {
"analysis": [],
"available": False,
"message": f"Bias detection error: {str(e)}"
}
def get_bias_pipeline_status() -> Dict[str, Any]:
"""
Returns the current status of the bias detection pipeline.
Returns:
Dictionary with pipeline availability status
"""
return {
"agent_name": AGENT_NAME,
"available": BIAS_PIPELINE is not None,
"model_loader_available": MODEL_LOADER_AVAILABLE
}