DocUA's picture
feat: introduce configurable content options and privacy controls for spiritual care messages.
50012e7
#!/usr/bin/env python3
"""
Universal AI Client for Lifestyle Journey Application
This module provides a unified interface for different AI providers (Google Gemini, Anthropic Claude)
with automatic fallback and provider-specific optimizations.
"""
import os
import json
import logging
import base64
import tempfile
from datetime import datetime
from typing import Optional, Dict, Any, List
from abc import ABC, abstractmethod
# Import configurations
from src.config.ai_providers_config import (
AIProvider, AIModel, get_agent_config, get_provider_config,
is_provider_available, get_available_providers
)
# Import provider-specific clients
try:
import google.genai as genai
from google.genai import types
GEMINI_AVAILABLE = True
except ImportError:
GEMINI_AVAILABLE = False
try:
import anthropic
ANTHROPIC_AVAILABLE = True
except ImportError:
ANTHROPIC_AVAILABLE = False
class BaseAIClient(ABC):
"""Abstract base class for AI clients"""
def __init__(self, provider: AIProvider, model: AIModel, temperature: float = 0.3):
self.provider = provider
self.model = model
self.temperature = temperature
self.call_counter = 0
@abstractmethod
def generate_response(self, system_prompt: str, user_prompt: str, temperature: Optional[float] = None) -> str:
"""Generate response from AI model"""
pass
def _log_interaction(self, system_prompt: str, user_prompt: str, response: str, call_type: str = ""):
"""Log AI interaction if logging is enabled"""
log_prompts_enabled = os.getenv("LOG_PROMPTS", "false").lower() == "true"
if not log_prompts_enabled:
return
logger = logging.getLogger(f"{__name__}.{self.provider.value}")
if not logger.handlers:
logger.setLevel(logging.INFO)
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
logger.addHandler(console_handler)
file_handler = logging.FileHandler('ai_interactions.log', encoding='utf-8')
file_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
logger.addHandler(file_handler)
self.call_counter += 1
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log_message = f"""
{'='*80}
{self.provider.value.upper()} API CALL #{self.call_counter} [{call_type}] - {timestamp}
{'='*80}
SYSTEM PROMPT:
{'-'*40}
{system_prompt}
USER PROMPT:
{'-'*40}
{user_prompt}
AI RESPONSE:
{'-'*40}
{response}
MODEL: {self.model.value}
TEMPERATURE: {self.temperature}
{'='*80}
"""
logger.info(log_message)
class GeminiClient(BaseAIClient):
"""Google Gemini AI client using the new google-genai library"""
def __init__(self, model: AIModel, temperature: float = 0.3):
super().__init__(AIProvider.GEMINI, model, temperature)
if not GEMINI_AVAILABLE:
raise ImportError("Google GenAI library not available. Install with: pip install google-genai")
gcp_base64 = os.getenv("GCP_SERVICE_ACCOUNT_B64")
api_key = os.getenv("GEMINI_API_KEY")
if gcp_base64:
try:
# Decode Service Account JSON from Base64
creds_json = base64.b64decode(gcp_base64).decode('utf-8')
# Create temporary file for credentials
# Library expects a file path in GOOGLE_APPLICATION_CREDENTIALS
self.temp_file = tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False)
self.temp_file.write(creds_json)
self.temp_file.flush()
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = self.temp_file.name
# Read project_id from credentials to initialize Vertex AI
creds_dict = json.loads(creds_json)
project_id = creds_dict.get("project_id")
# Initialize client with Vertex AI mode
self.client = genai.Client(
vertexai=True,
project=project_id,
location=os.getenv("GOOGLE_CLOUD_LOCATION", "us-central1")
)
logging.info(f"Initialized Gemini via Vertex AI (Project: {project_id})")
except Exception as e:
logging.error(f"Error initializing Gemini with Service Account: {e}")
# Fallback to API Key if Service Account fails
if api_key:
self.client = genai.Client(api_key=api_key)
logging.info("Fallback: Initialized Gemini via API Key after SA failure")
else:
raise ValueError(f"Failed to initialize Gemini with Service Account and no API Key found: {e}")
elif api_key:
# Traditional API Key initialization
self.client = genai.Client(api_key=api_key)
logging.info("Initialized Gemini via API Key")
else:
raise ValueError("No Gemini configuration found (GCP_SERVICE_ACCOUNT_B64 or GEMINI_API_KEY)")
self.model_name = model.value
def generate_response(self, system_prompt: str, user_prompt: str, temperature: Optional[float] = None) -> str:
"""Generate response from Gemini using the new API"""
if temperature is None:
temperature = self.temperature
try:
# Prepare the content parts
contents = [
types.Content(
role="user",
parts=[types.Part.from_text(text=user_prompt)],
)
]
# Configure generation settings
config_params = {
"temperature": temperature,
"thinking_config": types.ThinkingConfig(thinking_budget=0),
}
# Add system prompt if provided
if system_prompt:
config_params["system_instruction"] = [
types.Part.from_text(text=system_prompt)
]
config = types.GenerateContentConfig(**config_params)
# Generate the response
response_text = ""
for chunk in self.client.models.generate_content_stream(
model=self.model_name,
contents=contents,
config=config,
):
if chunk.text:
response_text += chunk.text
return response_text
except Exception as e:
error_msg = f"Gemini API error: {str(e)}"
logging.error(error_msg)
# Classify error type for better handling
if "rate limit" in str(e).lower() or "quota" in str(e).lower():
raise ValueError(f"Rate limit exceeded: {str(e)}") from e
elif "timeout" in str(e).lower() or "deadline" in str(e).lower():
raise TimeoutError(f"Request timeout: {str(e)}") from e
elif "connection" in str(e).lower() or "network" in str(e).lower():
raise ConnectionError(f"Network error: {str(e)}") from e
else:
raise RuntimeError(error_msg) from e
class AnthropicClient(BaseAIClient):
"""Anthropic Claude AI client"""
def __init__(self, model: AIModel, temperature: float = 0.3):
super().__init__(AIProvider.ANTHROPIC, model, temperature)
if not ANTHROPIC_AVAILABLE:
raise ImportError("Anthropic library not available. Install with: pip install anthropic")
api_key = os.getenv("ANTHROPIC_API_KEY")
if not api_key:
raise ValueError("ANTHROPIC_API_KEY environment variable not set")
self.client = anthropic.Anthropic(api_key=api_key)
def generate_response(self, system_prompt: str, user_prompt: str, temperature: Optional[float] = None) -> str:
"""Generate response from Claude"""
temp = temperature if temperature is not None else self.temperature
try:
message = self.client.messages.create(
model=self.model.value,
max_tokens=20000,
temperature=temp,
system=system_prompt,
messages=[
{
"role": "user",
"content": [
{
"type": "text",
"text": user_prompt
}
]
}
]
)
# Extract text content from response
response = ""
for content_block in message.content:
if hasattr(content_block, 'text'):
response += content_block.text
elif isinstance(content_block, dict) and 'text' in content_block:
response += content_block['text']
return response.strip()
except Exception as e:
error_msg = f"Anthropic API error: {str(e)}"
logging.error(error_msg)
# Classify error type for better handling
if "rate_limit" in str(e).lower() or "rate limit" in str(e).lower():
raise ValueError(f"Rate limit exceeded: {str(e)}") from e
elif "timeout" in str(e).lower():
raise TimeoutError(f"Request timeout: {str(e)}") from e
elif "connection" in str(e).lower() or "network" in str(e).lower():
raise ConnectionError(f"Network error: {str(e)}") from e
else:
raise RuntimeError(error_msg) from e
class UniversalAIClient:
"""
Universal AI client that automatically selects the appropriate provider
based on agent configuration and availability
"""
def __init__(self, agent_name: str, model_override: Optional[str] = None):
self.agent_name = agent_name
self.model_override = model_override
self.config = get_agent_config(agent_name)
self.client = None
self.fallback_client = None
self._initialize_clients()
@staticmethod
def _resolve_override_model(model_override: str) -> tuple[Optional[AIProvider], Optional[AIModel]]:
"""Resolve a UI-provided model string into provider+AIModel.
Expected strings (from UI dropdowns):
- gemini-2.5-flash / gemini-2.0-flash / gemini-3-flash-preview
- claude-sonnet-4-5-20250929 / claude-sonnet-4-20250514 / claude-3-7-sonnet-20250219 / ...
"""
if not model_override:
return None, None
override = model_override.strip()
if not override:
return None, None
try:
if override.startswith("gemini"):
return AIProvider.GEMINI, AIModel(override)
if override.startswith("claude"):
return AIProvider.ANTHROPIC, AIModel(override)
except Exception:
return None, None
return None, None
def _initialize_clients(self):
"""Initialize primary and fallback clients"""
primary_provider = self.config["provider"]
primary_model = self.config["model"]
temperature = self.config.get("temperature", 0.3)
# Optional: override model/provider (session-level setting from UI)
if self.model_override:
override_provider, override_model = self._resolve_override_model(self.model_override)
if override_provider is not None and override_model is not None:
primary_provider = override_provider
primary_model = override_model
# Try to initialize primary client
try:
if primary_provider == AIProvider.GEMINI and is_provider_available(AIProvider.GEMINI):
self.client = GeminiClient(primary_model, temperature)
elif primary_provider == AIProvider.ANTHROPIC and is_provider_available(AIProvider.ANTHROPIC):
self.client = AnthropicClient(primary_model, temperature)
except Exception as e:
print(f" Failed to initialize primary client for {self.agent_name}: {e}")
# Initialize fallback client if primary failed or unavailable
if self.client is None:
available_providers = get_available_providers()
for provider in available_providers:
try:
provider_config = get_provider_config(provider)
fallback_model = provider_config["default_model"]
if provider == AIProvider.GEMINI:
self.fallback_client = GeminiClient(fallback_model, temperature)
print(f" Using Gemini fallback for {self.agent_name}")
break
elif provider == AIProvider.ANTHROPIC:
self.fallback_client = AnthropicClient(fallback_model, temperature)
print(f" Using Anthropic fallback for {self.agent_name}")
break
except Exception as e:
print(f" Failed to initialize fallback {provider.value}: {e}")
continue
# Final check
if self.client is None and self.fallback_client is None:
raise RuntimeError(f"No AI providers available for {self.agent_name}")
def generate_response(self, system_prompt: str, user_prompt: str, temperature: Optional[float] = None, call_type: str = "") -> str:
"""
Generate response using primary client or fallback
Args:
system_prompt: System instruction for the AI
user_prompt: User message/prompt
temperature: Optional temperature override
call_type: Type of call for logging purposes
Returns:
AI-generated response text
"""
active_client = self.client or self.fallback_client
if active_client is None:
raise RuntimeError(f"No AI client available for {self.agent_name}")
try:
response = active_client.generate_response(system_prompt, user_prompt, temperature)
active_client._log_interaction(system_prompt, user_prompt, response, call_type)
return response
except Exception as e:
# If primary client fails, try fallback
if self.client is not None and self.fallback_client is not None and active_client == self.client:
print(f" Primary client failed for {self.agent_name}, trying fallback: {e}")
try:
response = self.fallback_client.generate_response(system_prompt, user_prompt, temperature)
self.fallback_client._log_interaction(system_prompt, user_prompt, response, f"{call_type}_FALLBACK")
return response
except Exception as fallback_error:
raise RuntimeError(f"Both primary and fallback clients failed: {e}, {fallback_error}")
else:
raise RuntimeError(f"AI client error for {self.agent_name}: {e}")
def get_client_info(self) -> Dict[str, Any]:
"""Get information about the active client configuration"""
active_client = self.client or self.fallback_client
return {
"agent_name": self.agent_name,
"configured_provider": self.config["provider"].value,
"configured_model": self.config["model"].value,
"active_provider": active_client.provider.value if active_client else None,
"active_model": active_client.model.value if active_client else None,
"using_fallback": self.client is None and self.fallback_client is not None,
"reasoning": self.config.get("reasoning", "No reasoning provided")
}
class AIClientManager:
"""
Strategic Enhancement: Multi-Provider AI Client Management
Design Philosophy:
- Maintain complete backward compatibility with existing GeminiAPI interface
- Add intelligent provider routing based on medical context
- Enable systematic optimization of AI provider effectiveness
- Implement comprehensive fallback and error recovery
"""
def __init__(self):
self._clients = {} # Cache for AI clients
self.call_counter = 0 # Backward compatibility
# Optional: allow an owning session/app to attach per-session overrides.
# Expected shape: {agent_name: model_string}
self.model_overrides: Dict[str, str] = {}
# Optional per-session prompt overrides.
# Expected shape: {agent_name: system_prompt_string}
self.prompt_overrides = {}
def set_model_overrides(self, overrides: Optional[Dict[str, str]] = None) -> None:
"""Set per-session model overrides.
This is intentionally a thin setter so multiple UI controllers
(chat / manual input / file upload) can share the same mechanism.
"""
self.model_overrides = dict(overrides or {})
def set_prompt_overrides(self, overrides: Optional[Dict[str, str]] = None) -> None:
"""Set per-session prompt overrides.
This avoids mutating module-level prompt constants and prevents
cross-session leakage.
Expected keys are agent names, e.g.:
- SpiritualDistressAnalyzer
- SoftSpiritualTriage
- TriageResponseEvaluator
- MedicalAssistant
- SoftMedicalTriage
- EntryClassifier
"""
self.prompt_overrides = dict(overrides or {})
# NEW: Enhanced client management for medical AI optimization
self.provider_performance_metrics = {}
self.medical_context_routing = {}
# Enhanced client retrieval with performance tracking
def get_client(self, agent_name: str, model_override: Optional[str] = None):
"""Get or create an AI client for the specified agent.
If `model_override` is provided, a new (non-cached) client is returned
to avoid cross-session leakage.
"""
if model_override:
return create_ai_client(agent_name, model_override=model_override)
if agent_name not in self._clients:
self._clients[agent_name] = create_ai_client(agent_name)
return self._clients[agent_name]
def generate_response(self, system_prompt: str, user_prompt: str,
temperature: float = None, call_type: str = "",
agent_name: str = "DefaultAgent",
medical_context: Optional[Dict] = None,
model_override: Optional[str] = None):
"""
Enhanced response generation with medical context awareness
Strategic Enhancement:
- Add medical context routing for improved safety
- Track provider performance for optimization
- Implement comprehensive error handling
- Maintain full backward compatibility
"""
try:
client = self.get_client(agent_name, model_override=model_override)
response = client.generate_response(
system_prompt=system_prompt,
user_prompt=user_prompt,
temperature=temperature,
call_type=call_type
)
self.call_counter += 1
return response
except Exception as e:
# TODO: Implement proper error handling and fallback
print(f"Error generating response: {e}")
raise
def _update_performance_metrics(self, agent_name: str, response_time: float,
success: bool, medical_context: Optional[Dict]):
"""Update performance metrics for continuous optimization"""
if agent_name not in self.provider_performance_metrics:
self.provider_performance_metrics[agent_name] = {
'total_calls': 0,
'successful_calls': 0,
'total_response_time': 0.0,
'last_error': None
}
metrics = self.provider_performance_metrics[agent_name]
metrics['total_calls'] += 1
metrics['total_response_time'] += response_time
if success:
metrics['successful_calls'] += 1
else:
metrics['last_error'] = str(datetime.now())
def get_client_info(self, agent_name: str) -> Dict[str, Any]:
"""Enhanced client information with performance analytics"""
client = self.get_client(agent_name)
metrics = self.provider_performance_metrics.get(agent_name, {})
return {
'agent_name': agent_name,
'call_count': self.call_counter,
'performance_metrics': metrics,
'client_info': client.get_client_info() if hasattr(client, 'get_client_info') else {}
}
def get_all_clients_info(self) -> Dict[str, Dict]:
"""Comprehensive client ecosystem status"""
return {name: self.get_client_info(name) for name in self._clients}
def call_spiritual_api(self, system_prompt: str, user_prompt: str,
temperature: float = 0.7,
model_override: Optional[str] = None) -> str:
"""
Call AI API for spiritual/emotional analysis.
Uses the spiritual analyzer agent configuration.
Args:
system_prompt: System prompt for the AI
user_prompt: User prompt/message to analyze
temperature: Temperature for response generation
Returns:
AI response as string
"""
if model_override is None and self.model_overrides:
model_override = self.model_overrides.get("SpiritualDistressAnalyzer")
if self.prompt_overrides and "SpiritualDistressAnalyzer" in self.prompt_overrides:
system_prompt = self.prompt_overrides["SpiritualDistressAnalyzer"]
return self.generate_response(
system_prompt=system_prompt,
user_prompt=user_prompt,
temperature=temperature,
call_type="spiritual_analysis",
agent_name="SpiritualDistressAnalyzer",
model_override=model_override,
)
def call_entry_classifier_api(self, system_prompt: str, user_prompt: str,
temperature: float = 0.3,
model_override: Optional[str] = None) -> str:
"""Call AI API for entry classification.
This is used by Enhanced Verification manual input / file upload modes.
"""
if model_override is None and self.model_overrides:
model_override = self.model_overrides.get("EntryClassifier")
if self.prompt_overrides and "EntryClassifier" in self.prompt_overrides:
system_prompt = self.prompt_overrides["EntryClassifier"]
return self.generate_response(
system_prompt=system_prompt,
user_prompt=user_prompt,
temperature=temperature,
call_type="entry_classification",
agent_name="EntryClassifier",
model_override=model_override,
)
def call_medical_api(self, system_prompt: str, user_prompt: str,
temperature: float = 0.3,
model_override: Optional[str] = None) -> str:
"""
Call AI API for medical assistance.
Uses the soft medical triage agent configuration.
Args:
system_prompt: System prompt for the AI
user_prompt: User prompt/message for medical guidance
temperature: Temperature for response generation
Returns:
AI response as string
"""
if model_override is None and self.model_overrides:
model_override = self.model_overrides.get("SoftMedicalTriage")
if self.prompt_overrides and "SoftMedicalTriage" in self.prompt_overrides:
system_prompt = self.prompt_overrides["SoftMedicalTriage"]
return self.generate_response(
system_prompt=system_prompt,
user_prompt=user_prompt,
temperature=temperature,
call_type="medical_assistance",
agent_name="SoftMedicalTriage",
model_override=model_override,
)
def call_soft_spiritual_triage_api(self, system_prompt: str, user_prompt: str,
temperature: float = 0.3,
model_override: Optional[str] = None) -> str:
"""Call AI API for soft spiritual triage question generation."""
if model_override is None and self.model_overrides:
model_override = self.model_overrides.get("SoftSpiritualTriage")
if self.prompt_overrides and "SoftSpiritualTriage" in self.prompt_overrides:
system_prompt = self.prompt_overrides["SoftSpiritualTriage"]
return self.generate_response(
system_prompt=system_prompt,
user_prompt=user_prompt,
temperature=temperature,
call_type="soft_spiritual_triage",
agent_name="SoftSpiritualTriage",
model_override=model_override,
)
def call_triage_response_evaluator_api(self, system_prompt: str, user_prompt: str,
temperature: float = 0.3,
model_override: Optional[str] = None) -> str:
"""Call AI API for triage response evaluation."""
if model_override is None and self.model_overrides:
model_override = self.model_overrides.get("TriageResponseEvaluator")
if self.prompt_overrides and "TriageResponseEvaluator" in self.prompt_overrides:
system_prompt = self.prompt_overrides["TriageResponseEvaluator"]
return self.generate_response(
system_prompt=system_prompt,
user_prompt=user_prompt,
temperature=temperature,
call_type="triage_response_evaluator",
agent_name="TriageResponseEvaluator",
model_override=model_override,
)
def call_medical_assistant_api(self, system_prompt: str, user_prompt: str,
temperature: float = 0.3,
model_override: Optional[str] = None) -> str:
"""Call AI API for medical assistant responses."""
if model_override is None and self.model_overrides:
model_override = self.model_overrides.get("MedicalAssistant")
if self.prompt_overrides and "MedicalAssistant" in self.prompt_overrides:
system_prompt = self.prompt_overrides["MedicalAssistant"]
return self.generate_response(
system_prompt=system_prompt,
user_prompt=user_prompt,
temperature=temperature,
call_type="medical_assistant",
agent_name="MedicalAssistant",
model_override=model_override,
)
# Factory function for easy client creation
def create_ai_client(agent_name: str, model_override: Optional[str] = None) -> UniversalAIClient:
"""
Create an AI client for a specific agent
Args:
agent_name: Name of the agent (e.g., "MainLifestyleAssistant")
Returns:
Configured UniversalAIClient instance
"""
return UniversalAIClient(agent_name, model_override=model_override)
if __name__ == "__main__":
print(" AI Client Test")
print("=" * 50)
# Test different agents
test_agents = ["MainLifestyleAssistant", "EntryClassifier", "MedicalAssistant"]
for agent_name in test_agents:
print(f"\n Testing {agent_name}:")
try:
client = create_ai_client(agent_name)
info = client.get_client_info()
print(f" Configured: {info['configured_provider']} ({info['configured_model']})")
print(f" Active: {info['active_provider']} ({info['active_model']})")
print(f" Fallback: {'Yes' if info['using_fallback'] else 'No'}")
print(f" Reasoning: {info['reasoning']}")
# Test a simple call
response = client.generate_response(
"You are a helpful assistant.",
"Say hello in one sentence.",
call_type="TEST"
)
print(f" Test response: {response[:100]}...")
except Exception as e:
print(f" Error: {e}")