#!/usr/bin/env python
"""
Fraud Model Explainability Assistant - Strands Agents
An AI-powered assistant that helps fraud analysts and executives understand
why specific applications were flagged as fraudulent, translating complex
model outputs into actionable insights.
Author: Fraud Model Data Science Team
Use Cases:
- Executive briefings on fraud decisions
- Fair lending compliance documentation
- Analyst investigation support
- Model decision audit trails
Production-Ready Confluence Integration (FastAPI Version)
Features:
- Comprehensive logging and monitoring
- Error handling and recovery
- Scheduled re-ingestion for keeping data fresh
- Performance metrics tracking
- FastAPI + uvicorn for Docker deployment
Prerequisites:
- Configure .env with Confluence credentials
"""
import os
import sys
import json
import warnings
import logging
import time
import base64
from functools import lru_cache
from typing import Optional, List
from datetime import datetime
# Suppress ResourceWarning for cleaner output
warnings.filterwarnings("ignore", category=ResourceWarning)
os.environ["PYTHONWARNINGS"] = "ignore::ResourceWarning"
# Load environment variables from .env file
try:
from dotenv import load_dotenv
load_dotenv()
except ImportError:
print("ā Warning: python-dotenv not installed. Install with: pip install python-dotenv")
print(" Environment variables must be set manually.")
from fastapi import FastAPI, HTTPException
from fastapi.responses import HTMLResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from strands import Agent
from strands.agent.conversation_manager import SlidingWindowConversationManager
from strands.models.openai import OpenAIModel
from strands.models.openai import OpenAIModel
from strands.handlers.callback_handler import PrintingCallbackHandler
# Telemetry
from telemetry import setup_telemetry
setup_telemetry()
# Import confluence-ingestor
from confluence_ingestor import ConfluenceRAG
from confluence_ingestor.adapters.strands import (
create_confluence_search_tool,
create_confluence_loader_tool,
)
# Import your existing fraud tools
from utils import (
get_application_summary,
explain_fraud_score,
compare_to_population,
check_fair_lending_flags,
get_identity_network,
get_model_performance,
SYSTEM_PROMPT as ORIGINAL_PROMPT,
)
# =============================================================================
# LOGGING CONFIGURATION
# =============================================================================
# Configure the root strands logger
logging.getLogger("strands").setLevel(logging.DEBUG)
# Add a handler to see the logs
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(levelname)s | %(name)s | %(message)s",
handlers=[
logging.FileHandler('fraud_assistant_confluence.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# =============================================================================
# METRICS TRACKING
# =============================================================================
class ConfluenceMetrics:
"""Track Confluence integration performance metrics."""
def __init__(self):
self.search_count = 0
self.cache_hits = 0
self.cache_misses = 0
self.errors = 0
self.last_ingestion = None
self.query_times = []
def record_search(self, cached: bool = False, duration: float = 0.0):
"""Record a search query."""
self.search_count += 1
if cached:
self.cache_hits += 1
else:
self.cache_misses += 1
self.query_times.append(duration)
def record_error(self):
"""Record an error."""
self.errors += 1
def record_ingestion(self):
"""Record a data ingestion."""
self.last_ingestion = datetime.now()
def get_stats(self) -> dict:
"""Get current metrics."""
return {
"total_searches": self.search_count,
"cache_hit_rate": (
self.cache_hits / self.search_count
if self.search_count > 0
else 0.0
),
"avg_query_time": (
sum(self.query_times) / len(self.query_times)
if self.query_times
else 0.0
),
"errors": self.errors,
"last_ingestion": str(self.last_ingestion) if self.last_ingestion else None,
}
# Global metrics instance
_metrics = ConfluenceMetrics()
# =============================================================================
# CONFLUENCE INITIALIZATION
# =============================================================================
_confluence_rag: Optional[ConfluenceRAG] = None
def init_confluence():
"""Initialize Confluence RAG with logging and metrics."""
global _confluence_rag
if _confluence_rag is None:
logger.info("Initializing Confluence integration...")
required_vars = ["CONFLUENCE_URL", "CONFLUENCE_EMAIL", "CONFLUENCE_API_TOKEN"]
missing_vars = [var for var in required_vars if not os.getenv(var)]
if missing_vars:
error_msg = f"Missing required environment variables: {', '.join(missing_vars)}"
logger.error(error_msg)
print(f"\nā ERROR: {error_msg}")
print("\nš Setup Instructions:")
print(" 1. Copy .env.example to .env:")
print(" cp .env.example .env")
print(" 2. Edit .env with your Confluence credentials")
print(" 3. See CONFLUENCE_SETUP_GUIDE.md for detailed setup instructions")
print("\nā App will run WITHOUT Confluence integration.\n")
raise ValueError(f"Missing Confluence credentials: {', '.join(missing_vars)}")
try:
_confluence_rag = ConfluenceRAG.from_env(
embedding_provider="huggingface",
vector_store_type="chroma",
)
spaces = {
"Acquisitio": 10,
}
for space_key, max_pages in spaces.items():
try:
logger.info(f"Ingesting Confluence space: {space_key}")
stats = _confluence_rag.ingest_space(
space_key, max_pages=max_pages, force=False
)
if stats.get("skipped"):
logger.info(f"{space_key}: {stats['reason']}")
print(f" ā {space_key}: {stats['reason']}")
else:
logger.info(f"{space_key}: Ingested {stats['pages']} pages")
print(f" ā {space_key}: {stats['pages']} pages indexed")
try:
from confluence_ingestor import ConfluenceClient
client = ConfluenceClient.from_env()
pages = client.load_space(space_key, max_pages=max_pages)
if pages:
logger.info(f"Pages in {space_key} ({len(pages)} total):")
print(f" š Pages in {space_key}:")
for i, page in enumerate(pages, 1):
logger.info(f" {i}. {page.title} (ID: {page.page_id})")
print(f" {i}. {page.title}")
else:
logger.warning(f"No pages found in space: {space_key}")
except Exception as page_list_error:
logger.warning(f"Could not retrieve page titles for {space_key}: {page_list_error}")
except Exception as e:
logger.error(f"Failed to ingest {space_key}: {e}")
print(f" ā {space_key}: Failed - {e}")
_metrics.record_error()
_metrics.record_ingestion()
logger.info("Confluence integration ready!")
print("ā Confluence integration ready!")
except Exception as e:
logger.error(f"Confluence initialization failed: {e}")
_metrics.record_error()
raise
return _confluence_rag
# =============================================================================
# SCHEDULED RE-INGESTION
# =============================================================================
def setup_scheduled_ingestion():
"""Set up scheduled Confluence re-ingestion for keeping data fresh."""
try:
from apscheduler.schedulers.background import BackgroundScheduler
except ImportError:
logger.warning("apscheduler not installed. Scheduled ingestion disabled.")
logger.warning("Install with: pip install apscheduler")
return None
def refresh_confluence():
"""Re-ingest Confluence spaces to pick up new content."""
logger.info("Starting scheduled Confluence re-ingestion...")
try:
rag = init_confluence()
spaces = ["Acquisitio"]
for space in spaces:
logger.info(f"Re-ingesting {space}...")
stats = rag.ingest_space(space, max_pages=100, force=True)
logger.info(f"{space}: Updated {stats['pages']} pages")
_metrics.record_ingestion()
logger.info("Scheduled re-ingestion completed successfully")
except Exception as e:
logger.error(f"Scheduled re-ingestion failed: {e}")
_metrics.record_error()
scheduler = BackgroundScheduler()
scheduler.add_job(refresh_confluence, 'cron', hour=2)
scheduler.start()
logger.info("Scheduled re-ingestion enabled (runs daily at 2 AM)")
return scheduler
# =============================================================================
# ENHANCED SYSTEM PROMPT
# =============================================================================
ENHANCED_PROMPT = """
You are a Fraud Model Explainability Assistant for a major financial services company.
Your role is to help fraud analysts, data scientists, and executives understand
fraud model decisions and their implications.
You have access to tools that can:
1. Retrieve application summaries and fraud scores
2. Explain why applications received specific fraud scores (SHAP-style explanations)
3. Compare applications to approved/denied populations statistically
4. Check for fair lending compliance concerns
5. Analyze identity networks and linkages
6. Show model performance metrics
7. **Search company Confluence documentation** for policies, procedures, and guidelines
8. **Load full Confluence pages** to extract specific information
When answering questions:
- Be precise and data-driven
- Highlight the most important risk factors first
- Explain technical concepts in business terms when speaking to executives
- Always mention fair lending implications when relevant
- Provide actionable insights, not just data
**CRITICAL: How to Handle Confluence Information Requests**
When users ask you to "report", "list", "find", "show", or "provide" specific information from documents:
1. **FIRST**: Use the confluence_search tool to find the relevant document and identify its title
2. **SECOND**: Use the confluence_loader tool with BOTH space_key AND page_title parameters to load that specific page
3. **THIRD**: Extract and present the requested information directly in your response from the loaded content
4. **FOURTH**: Provide the Confluence page citation/link as a reference for verification
For flagged applications, structure your response as:
1. Quick summary (score, decision, risk level)
2. Top contributing factors
3. How unusual this is compared to the population
4. Any compliance considerations (extract relevant policies from Confluence, then cite sources)
5. Recommended next steps (reference procedures from playbooks if available)
Remember: Your explanations may be used in regulatory examinations and audits,
so be accurate and thorough.
""".strip()
# =============================================================================
# AGENT CREATION
# =============================================================================
class FilePayload(BaseModel):
data: str # Base64 encoded data
format: str
name: str = "file"
class QuestionRequest(BaseModel):
question: str
files: Optional[List[FilePayload]] = None
_cached_agent = None
def create_enhanced_agent():
"""Create fraud agent with Confluence integration."""
global _cached_agent
if _cached_agent is not None:
return _cached_agent
openai_api_key = os.environ.get("OPENAI_API_KEY")
try:
rag = init_confluence()
search_confluence = create_confluence_search_tool(rag=rag, k=5)
load_confluence_page = create_confluence_loader_tool(max_pages=10)
tools = [
get_application_summary,
explain_fraud_score,
compare_to_population,
check_fair_lending_flags,
get_identity_network,
get_model_performance,
search_confluence,
load_confluence_page,
]
system_prompt = ENHANCED_PROMPT
except Exception as e:
logger.error(f"Confluence initialization failed: {e}")
print(f"ā Confluence disabled: {e}")
_metrics.record_error()
tools = [
get_application_summary,
explain_fraud_score,
compare_to_population,
check_fair_lending_flags,
get_identity_network,
get_model_performance,
]
system_prompt = ORIGINAL_PROMPT
if openai_api_key:
model = OpenAIModel(
client_args={"api_key": openai_api_key},
model_id="gpt-4o",
params={"temperature": 0.1, "max_tokens": 2048},
)
# Create a conversation manager with custom window size
conversation_manager = SlidingWindowConversationManager(
window_size=20, # Maximum number of messages to keep
should_truncate_results=True, # Enable truncating the tool result when a message is too large for the model's context window
)
# The default callback handler prints text and shows tool usage
_cached_agent = Agent(
model=model,
system_prompt=system_prompt,
tools=tools,
conversation_manager=conversation_manager,
callback_handler=PrintingCallbackHandler()
)
else:
# Create a conversation manager with custom window size
conversation_manager = SlidingWindowConversationManager(
window_size=20, # Maximum number of messages to keep
should_truncate_results=True, # Enable truncating the tool result when a message is too large for the model's context window
)
# The default callback handler prints text and shows tool usage
_cached_agent = Agent(
system_prompt=system_prompt,
tools=tools,
conversation_manager=conversation_manager,
callback_handler=PrintingCallbackHandler()
)
return _cached_agent
def query_agent(question: str, files: Optional[List[FilePayload]] = None, return_full_result: bool = False):
"""Process question with the enhanced agent, optionally including files."""
try:
logger.info(f"Processing query: {question}")
if files:
logger.info(f"Query includes {len(files)} files")
agent = create_enhanced_agent()
# Base text content
combined_text = question
# List to hold image blocks
image_blocks = []
if files:
try:
# Import necessary types and libraries inside logic to avoid top-level failures if missing
import io
import csv
import pypdf
from strands.types.content import ImageContent
from strands.types.media import ImageSource
# Try import python-docx
try:
import docx
except ImportError:
docx = None
logger.warning("python-docx not installed. DOCX support disabled.")
image_formats = {'png', 'jpeg', 'gif', 'webp', 'jpg'}
for file_obj in files:
try:
# Decode base64
base64_data = file_obj.data
if "," in base64_data:
base64_data = base64_data.split(",")[1]
file_bytes = base64.b64decode(base64_data)
fmt = file_obj.format.lower()
if fmt in image_formats:
# Handle Image - Keep as rich content
image_block = ImageContent(
format=fmt if fmt != 'jpg' else 'jpeg', # Normalize jpg
source=ImageSource(bytes=file_bytes)
)
image_blocks.append({"image": image_block})
else:
# Handle Document - Extract text and append to question
extracted_text = ""
if fmt == 'pdf':
try:
pdf_reader = pypdf.PdfReader(io.BytesIO(file_bytes))
for i, page in enumerate(pdf_reader.pages):
try:
text = page.extract_text()
if text:
extracted_text += text + "\n"
except Exception as page_err:
logger.warning(f"Failed to extract text from page {i} of {file_obj.name}: {page_err}")
extracted_text += f"[Error extracting page {i+1}]\n"
if not extracted_text.strip():
extracted_text = "[No text could be extracted from this PDF. It might be an image-only PDF.]"
except Exception as pdf_err:
logger.error(f"PDF extraction failed for {file_obj.name}: {pdf_err}")
extracted_text = f"[Error extracting PDF text for {file_obj.name}: {str(pdf_err)}]"
elif fmt == 'docx':
if docx:
try:
doc = docx.Document(io.BytesIO(file_bytes))
full_text = []
for para in doc.paragraphs:
full_text.append(para.text)
extracted_text = '\n'.join(full_text)
if not extracted_text.strip():
extracted_text = "[No text found in this DOCX file.]"
except Exception as docx_err:
logger.error(f"DOCX extraction failed for {file_obj.name}: {docx_err}")
extracted_text = f"[Error extracting DOCX text for {file_obj.name}: {str(docx_err)}]"
else:
extracted_text = "[DOCX support is not available. Please install python-docx.]"
elif fmt == 'csv':
try:
# Decode bytes to string
csv_text = file_bytes.decode('utf-8', errors='replace')
csv_file = io.StringIO(csv_text)
csv_reader = csv.reader(csv_file)
rows = []
for row in csv_reader:
rows.append(','.join(row))
extracted_text = '\n'.join(rows)
if not extracted_text.strip():
extracted_text = "[Empty CSV file.]"
except Exception as csv_err:
logger.error(f"CSV extraction failed for {file_obj.name}: {csv_err}")
extracted_text = f"[Error extracting CSV text for {file_obj.name}: {str(csv_err)}]"
else:
# Try decoding as plain text (txt, md, html, etc)
try:
extracted_text = file_bytes.decode('utf-8', errors='replace')
except Exception as dec_err:
logger.error(f"Text decoding failed for {file_obj.name}: {dec_err}")
extracted_text = f"[Error decoding text for {file_obj.name}]"
# Append to combined text
combined_text += f"\n\n--- Content from {file_obj.name} ---\n{extracted_text}\n-----------------------------------\n"
except Exception as err:
logger.error(f"Failed to process file {file_obj.name}: {err}")
except ImportError as ie:
logger.error(f"Missing dependency for file processing: {ie}")
return "Error: Server missing dependencies (pypdf, python-docx, or strands types) for file processing."
# Construct final payload
message_content = [{"text": combined_text}]
# Add any extracted images
message_content.extend(image_blocks)
# Call agent with list payload
result = agent(message_content)
logger.info("Query completed successfully")
if return_full_result:
return result
return str(result)
except Exception as e:
logger.error(f"Query failed: {e}")
_metrics.record_error()
return f"Error: {str(e)}"
# =============================================================================
# FASTAPI APPLICATION
# =============================================================================
app = FastAPI(title="Fraud Model Explainability Assistant")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
allow_credentials=True,
)
class AnswerResponse(BaseModel):
answer: str
metrics: dict
@app.get("/")
async def index():
"""Serve the main UI."""
return HTMLResponse(content=get_ui_html())
@app.post("/api/ask", response_model=AnswerResponse)
async def ask_question(request: QuestionRequest):
"""Process a question and return the answer."""
try:
answer = query_agent(request.question, request.files)
return AnswerResponse(
answer=answer,
metrics=_metrics.get_stats()
)
except Exception as e:
logger.error(f"API error: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/metrics")
async def get_metrics():
"""Get current performance metrics."""
return _metrics.get_stats()
@app.get("/api/health")
async def health_check():
"""Health check endpoint."""
return {
"status": "healthy",
"confluence_initialized": _confluence_rag is not None,
"metrics": _metrics.get_stats()
}
# =============================================================================
# HTML UI
# =============================================================================
def get_ui_html() -> str:
"""Generate the chat UI HTML."""
example_questions = [
"Why was application APP-78432 flagged as high risk?",
"Explain the fraud score for APP-12345 and compare it to approved applications",
"Check fair lending compliance for APP-55555 and cite relevant policies",
"Show me the identity network analysis for APP-78432",
"What's the current model performance for the Retail Card portfolio?",
"What does our fair lending policy say about synthetic ID detection?",
"Find the model validation report for XGBoost v3.2",
"What are the procedures for escalating high-risk applications?",
]
examples_json = json.dumps(example_questions)
return f"""
Fraud Model Explainability Assistant
š Fraud Model Explainability Assistant
Production-Ready with Confluence Integration
Welcome! Ask me about:
⢠Application fraud scores and explanations
⢠Fair lending compliance checks
⢠Identity network analysis
⢠Model performance metrics
⢠Confluence documentation and policies
Enter your question above and click "Analyze" to get started.
š Performance Metrics
0
Searches
0%
Cache Rate
0s
Avg Time
0
Errors
š” Example Questions
⨠Production Features
Structured logging
Performance tracking
Error monitoring
Daily auto-refresh (2 AM)
Confluence integration
"""
# =============================================================================
# MAIN ENTRYPOINT
# =============================================================================
if __name__ == "__main__":
import uvicorn
# Pre-initialize Confluence
try:
init_confluence()
except Exception as e:
logger.error(f"Confluence initialization failed: {e}")
print(f"Warning: Confluence initialization failed: {e}")
print("App will run without Confluence integration.")
# Set up scheduled re-ingestion
scheduler = setup_scheduled_ingestion()
# Launch FastAPI with uvicorn
logger.info("Launching FastAPI server...")
print("\n" + "=" * 60)
print("Fraud Model Explainability Assistant")
print(" - FastAPI backend on port 7860")
print(" - Confluence integration enabled")
print(" - Scheduled refresh at 2 AM daily")
print("=" * 60 + "\n")
uvicorn.run(
app,
host="0.0.0.0",
port=7860,
reload=False
)