#!/usr/bin/env python3 """ Universal AI Client for Lifestyle Journey Application This module provides a unified interface for different AI providers (Google Gemini, Anthropic Claude) with automatic fallback and provider-specific optimizations. """ import os import json import logging import base64 import tempfile from datetime import datetime from typing import Optional, Dict, Any, List from abc import ABC, abstractmethod # Import configurations from src.config.ai_providers_config import ( AIProvider, AIModel, get_agent_config, get_provider_config, is_provider_available, get_available_providers ) # Import provider-specific clients try: import google.genai as genai from google.genai import types GEMINI_AVAILABLE = True except ImportError: GEMINI_AVAILABLE = False try: import anthropic ANTHROPIC_AVAILABLE = True except ImportError: ANTHROPIC_AVAILABLE = False class BaseAIClient(ABC): """Abstract base class for AI clients""" def __init__(self, provider: AIProvider, model: AIModel, temperature: float = 0.3): self.provider = provider self.model = model self.temperature = temperature self.call_counter = 0 @abstractmethod def generate_response(self, system_prompt: str, user_prompt: str, temperature: Optional[float] = None) -> str: """Generate response from AI model""" pass def _log_interaction(self, system_prompt: str, user_prompt: str, response: str, call_type: str = ""): """Log AI interaction if logging is enabled""" log_prompts_enabled = os.getenv("LOG_PROMPTS", "false").lower() == "true" if not log_prompts_enabled: return logger = logging.getLogger(f"{__name__}.{self.provider.value}") if not logger.handlers: logger.setLevel(logging.INFO) console_handler = logging.StreamHandler() console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) logger.addHandler(console_handler) file_handler = logging.FileHandler('ai_interactions.log', encoding='utf-8') file_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) logger.addHandler(file_handler) self.call_counter += 1 timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") log_message = f""" {'='*80} {self.provider.value.upper()} API CALL #{self.call_counter} [{call_type}] - {timestamp} {'='*80} SYSTEM PROMPT: {'-'*40} {system_prompt} USER PROMPT: {'-'*40} {user_prompt} AI RESPONSE: {'-'*40} {response} MODEL: {self.model.value} TEMPERATURE: {self.temperature} {'='*80} """ logger.info(log_message) class GeminiClient(BaseAIClient): """Google Gemini AI client using the new google-genai library""" def __init__(self, model: AIModel, temperature: float = 0.3): super().__init__(AIProvider.GEMINI, model, temperature) if not GEMINI_AVAILABLE: raise ImportError("Google GenAI library not available. Install with: pip install google-genai") gcp_base64 = os.getenv("GCP_SERVICE_ACCOUNT_B64") api_key = os.getenv("GEMINI_API_KEY") if gcp_base64: try: # Decode Service Account JSON from Base64 creds_json = base64.b64decode(gcp_base64).decode('utf-8') # Create temporary file for credentials # Library expects a file path in GOOGLE_APPLICATION_CREDENTIALS self.temp_file = tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) self.temp_file.write(creds_json) self.temp_file.flush() os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = self.temp_file.name # Read project_id from credentials to initialize Vertex AI creds_dict = json.loads(creds_json) project_id = creds_dict.get("project_id") # Initialize client with Vertex AI mode self.client = genai.Client( vertexai=True, project=project_id, location=os.getenv("GOOGLE_CLOUD_LOCATION", "us-central1") ) logging.info(f"Initialized Gemini via Vertex AI (Project: {project_id})") except Exception as e: logging.error(f"Error initializing Gemini with Service Account: {e}") # Fallback to API Key if Service Account fails if api_key: self.client = genai.Client(api_key=api_key) logging.info("Fallback: Initialized Gemini via API Key after SA failure") else: raise ValueError(f"Failed to initialize Gemini with Service Account and no API Key found: {e}") elif api_key: # Traditional API Key initialization self.client = genai.Client(api_key=api_key) logging.info("Initialized Gemini via API Key") else: raise ValueError("No Gemini configuration found (GCP_SERVICE_ACCOUNT_B64 or GEMINI_API_KEY)") self.model_name = model.value def generate_response(self, system_prompt: str, user_prompt: str, temperature: Optional[float] = None) -> str: """Generate response from Gemini using the new API""" if temperature is None: temperature = self.temperature try: # Prepare the content parts contents = [ types.Content( role="user", parts=[types.Part.from_text(text=user_prompt)], ) ] # Configure generation settings config_params = { "temperature": temperature, "thinking_config": types.ThinkingConfig(thinking_budget=0), } # Add system prompt if provided if system_prompt: config_params["system_instruction"] = [ types.Part.from_text(text=system_prompt) ] config = types.GenerateContentConfig(**config_params) # Generate the response response_text = "" for chunk in self.client.models.generate_content_stream( model=self.model_name, contents=contents, config=config, ): if chunk.text: response_text += chunk.text return response_text except Exception as e: error_msg = f"Gemini API error: {str(e)}" logging.error(error_msg) # Classify error type for better handling if "rate limit" in str(e).lower() or "quota" in str(e).lower(): raise ValueError(f"Rate limit exceeded: {str(e)}") from e elif "timeout" in str(e).lower() or "deadline" in str(e).lower(): raise TimeoutError(f"Request timeout: {str(e)}") from e elif "connection" in str(e).lower() or "network" in str(e).lower(): raise ConnectionError(f"Network error: {str(e)}") from e else: raise RuntimeError(error_msg) from e class AnthropicClient(BaseAIClient): """Anthropic Claude AI client""" def __init__(self, model: AIModel, temperature: float = 0.3): super().__init__(AIProvider.ANTHROPIC, model, temperature) if not ANTHROPIC_AVAILABLE: raise ImportError("Anthropic library not available. Install with: pip install anthropic") api_key = os.getenv("ANTHROPIC_API_KEY") if not api_key: raise ValueError("ANTHROPIC_API_KEY environment variable not set") self.client = anthropic.Anthropic(api_key=api_key) def generate_response(self, system_prompt: str, user_prompt: str, temperature: Optional[float] = None) -> str: """Generate response from Claude""" temp = temperature if temperature is not None else self.temperature try: message = self.client.messages.create( model=self.model.value, max_tokens=20000, temperature=temp, system=system_prompt, messages=[ { "role": "user", "content": [ { "type": "text", "text": user_prompt } ] } ] ) # Extract text content from response response = "" for content_block in message.content: if hasattr(content_block, 'text'): response += content_block.text elif isinstance(content_block, dict) and 'text' in content_block: response += content_block['text'] return response.strip() except Exception as e: error_msg = f"Anthropic API error: {str(e)}" logging.error(error_msg) # Classify error type for better handling if "rate_limit" in str(e).lower() or "rate limit" in str(e).lower(): raise ValueError(f"Rate limit exceeded: {str(e)}") from e elif "timeout" in str(e).lower(): raise TimeoutError(f"Request timeout: {str(e)}") from e elif "connection" in str(e).lower() or "network" in str(e).lower(): raise ConnectionError(f"Network error: {str(e)}") from e else: raise RuntimeError(error_msg) from e class UniversalAIClient: """ Universal AI client that automatically selects the appropriate provider based on agent configuration and availability """ def __init__(self, agent_name: str, model_override: Optional[str] = None): self.agent_name = agent_name self.model_override = model_override self.config = get_agent_config(agent_name) self.client = None self.fallback_client = None self._initialize_clients() @staticmethod def _resolve_override_model(model_override: str) -> tuple[Optional[AIProvider], Optional[AIModel]]: """Resolve a UI-provided model string into provider+AIModel. Expected strings (from UI dropdowns): - gemini-2.5-flash / gemini-2.0-flash / gemini-3-flash-preview - claude-sonnet-4-5-20250929 / claude-sonnet-4-20250514 / claude-3-7-sonnet-20250219 / ... """ if not model_override: return None, None override = model_override.strip() if not override: return None, None try: if override.startswith("gemini"): return AIProvider.GEMINI, AIModel(override) if override.startswith("claude"): return AIProvider.ANTHROPIC, AIModel(override) except Exception: return None, None return None, None def _initialize_clients(self): """Initialize primary and fallback clients""" primary_provider = self.config["provider"] primary_model = self.config["model"] temperature = self.config.get("temperature", 0.3) # Optional: override model/provider (session-level setting from UI) if self.model_override: override_provider, override_model = self._resolve_override_model(self.model_override) if override_provider is not None and override_model is not None: primary_provider = override_provider primary_model = override_model # Try to initialize primary client try: if primary_provider == AIProvider.GEMINI and is_provider_available(AIProvider.GEMINI): self.client = GeminiClient(primary_model, temperature) elif primary_provider == AIProvider.ANTHROPIC and is_provider_available(AIProvider.ANTHROPIC): self.client = AnthropicClient(primary_model, temperature) except Exception as e: print(f" Failed to initialize primary client for {self.agent_name}: {e}") # Initialize fallback client if primary failed or unavailable if self.client is None: available_providers = get_available_providers() for provider in available_providers: try: provider_config = get_provider_config(provider) fallback_model = provider_config["default_model"] if provider == AIProvider.GEMINI: self.fallback_client = GeminiClient(fallback_model, temperature) print(f" Using Gemini fallback for {self.agent_name}") break elif provider == AIProvider.ANTHROPIC: self.fallback_client = AnthropicClient(fallback_model, temperature) print(f" Using Anthropic fallback for {self.agent_name}") break except Exception as e: print(f" Failed to initialize fallback {provider.value}: {e}") continue # Final check if self.client is None and self.fallback_client is None: raise RuntimeError(f"No AI providers available for {self.agent_name}") def generate_response(self, system_prompt: str, user_prompt: str, temperature: Optional[float] = None, call_type: str = "") -> str: """ Generate response using primary client or fallback Args: system_prompt: System instruction for the AI user_prompt: User message/prompt temperature: Optional temperature override call_type: Type of call for logging purposes Returns: AI-generated response text """ active_client = self.client or self.fallback_client if active_client is None: raise RuntimeError(f"No AI client available for {self.agent_name}") try: response = active_client.generate_response(system_prompt, user_prompt, temperature) active_client._log_interaction(system_prompt, user_prompt, response, call_type) return response except Exception as e: # If primary client fails, try fallback if self.client is not None and self.fallback_client is not None and active_client == self.client: print(f" Primary client failed for {self.agent_name}, trying fallback: {e}") try: response = self.fallback_client.generate_response(system_prompt, user_prompt, temperature) self.fallback_client._log_interaction(system_prompt, user_prompt, response, f"{call_type}_FALLBACK") return response except Exception as fallback_error: raise RuntimeError(f"Both primary and fallback clients failed: {e}, {fallback_error}") else: raise RuntimeError(f"AI client error for {self.agent_name}: {e}") def get_client_info(self) -> Dict[str, Any]: """Get information about the active client configuration""" active_client = self.client or self.fallback_client return { "agent_name": self.agent_name, "configured_provider": self.config["provider"].value, "configured_model": self.config["model"].value, "active_provider": active_client.provider.value if active_client else None, "active_model": active_client.model.value if active_client else None, "using_fallback": self.client is None and self.fallback_client is not None, "reasoning": self.config.get("reasoning", "No reasoning provided") } class AIClientManager: """ Strategic Enhancement: Multi-Provider AI Client Management Design Philosophy: - Maintain complete backward compatibility with existing GeminiAPI interface - Add intelligent provider routing based on medical context - Enable systematic optimization of AI provider effectiveness - Implement comprehensive fallback and error recovery """ def __init__(self): self._clients = {} # Cache for AI clients self.call_counter = 0 # Backward compatibility # Optional: allow an owning session/app to attach per-session overrides. # Expected shape: {agent_name: model_string} self.model_overrides: Dict[str, str] = {} # Optional per-session prompt overrides. # Expected shape: {agent_name: system_prompt_string} self.prompt_overrides = {} def set_model_overrides(self, overrides: Optional[Dict[str, str]] = None) -> None: """Set per-session model overrides. This is intentionally a thin setter so multiple UI controllers (chat / manual input / file upload) can share the same mechanism. """ self.model_overrides = dict(overrides or {}) def set_prompt_overrides(self, overrides: Optional[Dict[str, str]] = None) -> None: """Set per-session prompt overrides. This avoids mutating module-level prompt constants and prevents cross-session leakage. Expected keys are agent names, e.g.: - SpiritualDistressAnalyzer - SoftSpiritualTriage - TriageResponseEvaluator - MedicalAssistant - SoftMedicalTriage - EntryClassifier """ self.prompt_overrides = dict(overrides or {}) # NEW: Enhanced client management for medical AI optimization self.provider_performance_metrics = {} self.medical_context_routing = {} # Enhanced client retrieval with performance tracking def get_client(self, agent_name: str, model_override: Optional[str] = None): """Get or create an AI client for the specified agent. If `model_override` is provided, a new (non-cached) client is returned to avoid cross-session leakage. """ if model_override: return create_ai_client(agent_name, model_override=model_override) if agent_name not in self._clients: self._clients[agent_name] = create_ai_client(agent_name) return self._clients[agent_name] def generate_response(self, system_prompt: str, user_prompt: str, temperature: float = None, call_type: str = "", agent_name: str = "DefaultAgent", medical_context: Optional[Dict] = None, model_override: Optional[str] = None): """ Enhanced response generation with medical context awareness Strategic Enhancement: - Add medical context routing for improved safety - Track provider performance for optimization - Implement comprehensive error handling - Maintain full backward compatibility """ try: client = self.get_client(agent_name, model_override=model_override) response = client.generate_response( system_prompt=system_prompt, user_prompt=user_prompt, temperature=temperature, call_type=call_type ) self.call_counter += 1 return response except Exception as e: # TODO: Implement proper error handling and fallback print(f"Error generating response: {e}") raise def _update_performance_metrics(self, agent_name: str, response_time: float, success: bool, medical_context: Optional[Dict]): """Update performance metrics for continuous optimization""" if agent_name not in self.provider_performance_metrics: self.provider_performance_metrics[agent_name] = { 'total_calls': 0, 'successful_calls': 0, 'total_response_time': 0.0, 'last_error': None } metrics = self.provider_performance_metrics[agent_name] metrics['total_calls'] += 1 metrics['total_response_time'] += response_time if success: metrics['successful_calls'] += 1 else: metrics['last_error'] = str(datetime.now()) def get_client_info(self, agent_name: str) -> Dict[str, Any]: """Enhanced client information with performance analytics""" client = self.get_client(agent_name) metrics = self.provider_performance_metrics.get(agent_name, {}) return { 'agent_name': agent_name, 'call_count': self.call_counter, 'performance_metrics': metrics, 'client_info': client.get_client_info() if hasattr(client, 'get_client_info') else {} } def get_all_clients_info(self) -> Dict[str, Dict]: """Comprehensive client ecosystem status""" return {name: self.get_client_info(name) for name in self._clients} def call_spiritual_api(self, system_prompt: str, user_prompt: str, temperature: float = 0.7, model_override: Optional[str] = None) -> str: """ Call AI API for spiritual/emotional analysis. Uses the spiritual analyzer agent configuration. Args: system_prompt: System prompt for the AI user_prompt: User prompt/message to analyze temperature: Temperature for response generation Returns: AI response as string """ if model_override is None and self.model_overrides: model_override = self.model_overrides.get("SpiritualDistressAnalyzer") if self.prompt_overrides and "SpiritualDistressAnalyzer" in self.prompt_overrides: system_prompt = self.prompt_overrides["SpiritualDistressAnalyzer"] return self.generate_response( system_prompt=system_prompt, user_prompt=user_prompt, temperature=temperature, call_type="spiritual_analysis", agent_name="SpiritualDistressAnalyzer", model_override=model_override, ) def call_entry_classifier_api(self, system_prompt: str, user_prompt: str, temperature: float = 0.3, model_override: Optional[str] = None) -> str: """Call AI API for entry classification. This is used by Enhanced Verification manual input / file upload modes. """ if model_override is None and self.model_overrides: model_override = self.model_overrides.get("EntryClassifier") if self.prompt_overrides and "EntryClassifier" in self.prompt_overrides: system_prompt = self.prompt_overrides["EntryClassifier"] return self.generate_response( system_prompt=system_prompt, user_prompt=user_prompt, temperature=temperature, call_type="entry_classification", agent_name="EntryClassifier", model_override=model_override, ) def call_medical_api(self, system_prompt: str, user_prompt: str, temperature: float = 0.3, model_override: Optional[str] = None) -> str: """ Call AI API for medical assistance. Uses the soft medical triage agent configuration. Args: system_prompt: System prompt for the AI user_prompt: User prompt/message for medical guidance temperature: Temperature for response generation Returns: AI response as string """ if model_override is None and self.model_overrides: model_override = self.model_overrides.get("SoftMedicalTriage") if self.prompt_overrides and "SoftMedicalTriage" in self.prompt_overrides: system_prompt = self.prompt_overrides["SoftMedicalTriage"] return self.generate_response( system_prompt=system_prompt, user_prompt=user_prompt, temperature=temperature, call_type="medical_assistance", agent_name="SoftMedicalTriage", model_override=model_override, ) def call_soft_spiritual_triage_api(self, system_prompt: str, user_prompt: str, temperature: float = 0.3, model_override: Optional[str] = None) -> str: """Call AI API for soft spiritual triage question generation.""" if model_override is None and self.model_overrides: model_override = self.model_overrides.get("SoftSpiritualTriage") if self.prompt_overrides and "SoftSpiritualTriage" in self.prompt_overrides: system_prompt = self.prompt_overrides["SoftSpiritualTriage"] return self.generate_response( system_prompt=system_prompt, user_prompt=user_prompt, temperature=temperature, call_type="soft_spiritual_triage", agent_name="SoftSpiritualTriage", model_override=model_override, ) def call_triage_response_evaluator_api(self, system_prompt: str, user_prompt: str, temperature: float = 0.3, model_override: Optional[str] = None) -> str: """Call AI API for triage response evaluation.""" if model_override is None and self.model_overrides: model_override = self.model_overrides.get("TriageResponseEvaluator") if self.prompt_overrides and "TriageResponseEvaluator" in self.prompt_overrides: system_prompt = self.prompt_overrides["TriageResponseEvaluator"] return self.generate_response( system_prompt=system_prompt, user_prompt=user_prompt, temperature=temperature, call_type="triage_response_evaluator", agent_name="TriageResponseEvaluator", model_override=model_override, ) def call_medical_assistant_api(self, system_prompt: str, user_prompt: str, temperature: float = 0.3, model_override: Optional[str] = None) -> str: """Call AI API for medical assistant responses.""" if model_override is None and self.model_overrides: model_override = self.model_overrides.get("MedicalAssistant") if self.prompt_overrides and "MedicalAssistant" in self.prompt_overrides: system_prompt = self.prompt_overrides["MedicalAssistant"] return self.generate_response( system_prompt=system_prompt, user_prompt=user_prompt, temperature=temperature, call_type="medical_assistant", agent_name="MedicalAssistant", model_override=model_override, ) # Factory function for easy client creation def create_ai_client(agent_name: str, model_override: Optional[str] = None) -> UniversalAIClient: """ Create an AI client for a specific agent Args: agent_name: Name of the agent (e.g., "MainLifestyleAssistant") Returns: Configured UniversalAIClient instance """ return UniversalAIClient(agent_name, model_override=model_override) if __name__ == "__main__": print(" AI Client Test") print("=" * 50) # Test different agents test_agents = ["MainLifestyleAssistant", "EntryClassifier", "MedicalAssistant"] for agent_name in test_agents: print(f"\n Testing {agent_name}:") try: client = create_ai_client(agent_name) info = client.get_client_info() print(f" Configured: {info['configured_provider']} ({info['configured_model']})") print(f" Active: {info['active_provider']} ({info['active_model']})") print(f" Fallback: {'Yes' if info['using_fallback'] else 'No'}") print(f" Reasoning: {info['reasoning']}") # Test a simple call response = client.generate_response( "You are a helpful assistant.", "Say hello in one sentence.", call_type="TEST" ) print(f" Test response: {response[:100]}...") except Exception as e: print(f" Error: {e}")