| import base64
|
| import os
|
| from openai import OpenAI
|
| import google.generativeai as genai
|
| from typing import List, Dict, Optional, Tuple
|
| import json
|
|
|
| class ModelConfig:
|
| """Configuration loader for API keys and model preferences."""
|
|
|
| def __init__(self, config_file: str = "config.env"):
|
| self.config_file = config_file
|
| self.config = self._load_config()
|
|
|
| def _load_config(self) -> Dict:
|
| """Load configuration from file and environment variables."""
|
| config = {
|
| 'nvidia_api_key': os.getenv('NVIDIA_API_KEY'),
|
| 'gemini_api_key': os.getenv('GEMINI_API_KEY'),
|
| 'nvidia_models': [],
|
| 'nvidia_vision_models': [],
|
| 'nvidia_text_models': [],
|
| 'gemini_models': [],
|
| 'max_tokens': 500,
|
| 'temperature': 0.2,
|
| 'request_timeout': 30,
|
| 'max_retries': 2,
|
| 'enable_streaming': True,
|
| 'verbose_logging': True,
|
| 'enable_gemini_fallback': True
|
| }
|
|
|
|
|
| if os.path.exists(self.config_file):
|
| with open(self.config_file, 'r') as f:
|
| for line in f:
|
| line = line.strip()
|
| if line and not line.startswith('#') and '=' in line:
|
| key, value = line.split('=', 1)
|
| key = key.strip()
|
| value = value.strip()
|
|
|
| if key == 'NVIDIA_API_KEY' and not config['nvidia_api_key']:
|
| config['nvidia_api_key'] = value
|
| elif key == 'GEMINI_API_KEY' and not config['gemini_api_key']:
|
| config['gemini_api_key'] = value
|
| elif key == 'NVIDIA_MODELS':
|
| config['nvidia_models'] = [m.strip() for m in value.split(',') if m.strip()]
|
| elif key == 'NVIDIA_VISION_MODELS':
|
| config['nvidia_vision_models'] = [m.strip() for m in value.split(',') if m.strip()]
|
| elif key == 'NVIDIA_TEXT_MODELS':
|
| config['nvidia_text_models'] = [m.strip() for m in value.split(',') if m.strip()]
|
| elif key == 'GEMINI_MODELS':
|
| config['gemini_models'] = [m.strip() for m in value.split(',') if m.strip()]
|
| elif key == 'MAX_TOKENS':
|
| config['max_tokens'] = int(value)
|
| elif key == 'TEMPERATURE':
|
| config['temperature'] = float(value)
|
| elif key == 'REQUEST_TIMEOUT':
|
| config['request_timeout'] = int(value)
|
| elif key == 'MAX_RETRIES':
|
| config['max_retries'] = int(value)
|
| elif key == 'ENABLE_STREAMING':
|
| config['enable_streaming'] = value.lower() == 'true'
|
| elif key == 'VERBOSE_LOGGING':
|
| config['verbose_logging'] = value.lower() == 'true'
|
| elif key == 'ENABLE_GEMINI_FALLBACK':
|
| config['enable_gemini_fallback'] = value.lower() == 'true'
|
|
|
| return config
|
|
|
| def get(self, key: str, default=None):
|
| """Get configuration value."""
|
| return self.config.get(key, default)
|
|
|
|
|
| class ImageAnalyzer:
|
| """Multi-model image analyzer with cascading fallback support."""
|
|
|
| def __init__(self, config_file: str = "config.env"):
|
| self.config = ModelConfig(config_file)
|
| self.verbose = self.config.get('verbose_logging', True)
|
|
|
|
|
| nvidia_key = self.config.get('nvidia_api_key')
|
| if nvidia_key:
|
| self.nvidia_client = OpenAI(
|
| base_url="https://integrate.api.nvidia.com/v1",
|
| api_key=nvidia_key
|
| )
|
| else:
|
| self.nvidia_client = None
|
| self._log("⚠️ NVIDIA API key not found", force=True)
|
|
|
|
|
| gemini_key = self.config.get('gemini_api_key')
|
| if gemini_key and self.config.get('enable_gemini_fallback'):
|
| genai.configure(api_key=gemini_key)
|
| self.gemini_enabled = True
|
| else:
|
| self.gemini_enabled = False
|
| if self.config.get('enable_gemini_fallback'):
|
| self._log("⚠️ Gemini API key not found", force=True)
|
|
|
| def _log(self, message: str, force: bool = False):
|
| """Log message if verbose logging is enabled."""
|
| if self.verbose or force:
|
| print(message)
|
|
|
| def encode_image(self, image_path: str) -> str:
|
| """Encode image to base64 string."""
|
| with open(image_path, "rb") as image_file:
|
| return base64.b64encode(image_file.read()).decode('utf-8')
|
|
|
| def _try_nvidia_model(self, model_name: str, base64_image: str, prompt: str) -> Tuple[bool, Optional[str]]:
|
| """
|
| Try to analyze image with a specific NVIDIA model.
|
|
|
| Returns:
|
| Tuple of (success: bool, response: Optional[str])
|
| """
|
| if not self.nvidia_client:
|
| return False, None
|
|
|
| self._log(f"\n{'='*60}")
|
| self._log(f"🔄 Attempting NVIDIA Model: {model_name}")
|
| self._log(f"{'='*60}")
|
|
|
| try:
|
| completion = self.nvidia_client.chat.completions.create(
|
| model=model_name,
|
| messages=[
|
| {
|
| "role": "user",
|
| "content": [
|
| {"type": "text", "text": prompt},
|
| {
|
| "type": "image_url",
|
| "image_url": {
|
| "url": f"data:image/png;base64,{base64_image}"
|
| }
|
| }
|
| ]
|
| }
|
| ],
|
| max_tokens=self.config.get('max_tokens', 500),
|
| temperature=self.config.get('temperature', 0.2),
|
| stream=self.config.get('enable_streaming', True)
|
| )
|
|
|
| self._log(f"\n✅ Model Response:\n" + "-"*60)
|
| response_text = ""
|
|
|
| if self.config.get('enable_streaming', True):
|
| for chunk in completion:
|
| content = chunk.choices[0].delta.content
|
| if content is not None:
|
| response_text += content
|
| self._log(content, force=False)
|
| else:
|
| response_text = completion.choices[0].message.content
|
| self._log(response_text)
|
|
|
| self._log("\n" + "-"*60)
|
| return True, response_text
|
|
|
| except Exception as e:
|
| self._log(f"\n❌ Model Failed: {e}")
|
| return False, None
|
|
|
| def _try_gemini_model(self, model_name: str, base64_image: str, prompt: str) -> Tuple[bool, Optional[str]]:
|
| """
|
| Try to analyze image with a specific Gemini model.
|
|
|
| Returns:
|
| Tuple of (success: bool, response: Optional[str])
|
| """
|
| if not self.gemini_enabled:
|
| return False, None
|
|
|
| self._log(f"\n{'='*60}")
|
| self._log(f"🔄 Attempting Gemini Model: {model_name}")
|
| self._log(f"{'='*60}")
|
|
|
| try:
|
| model = genai.GenerativeModel(model_name)
|
|
|
|
|
| image_bytes = base64.b64decode(base64_image)
|
|
|
|
|
| image_part = {
|
| 'mime_type': 'image/png',
|
| 'data': image_bytes
|
| }
|
|
|
| response = model.generate_content([prompt, image_part])
|
| response_text = response.text
|
|
|
| self._log(f"\n✅ Model Response:\n" + "-"*60)
|
| self._log(response_text)
|
| self._log("\n" + "-"*60)
|
|
|
| return True, response_text
|
|
|
| except Exception as e:
|
| self._log(f"\n❌ Model Failed: {e}")
|
| return False, None
|
|
|
| def analyze_image(self, image_path: str, prompt: str = "Please summarize what you see in this image.") -> Dict:
|
| """
|
| Analyze image with cascading fallback across multiple models.
|
|
|
| Args:
|
| image_path: Path to the image file
|
| prompt: Analysis prompt
|
|
|
| Returns:
|
| Dictionary with analysis results and metadata
|
| """
|
|
|
| if not os.path.exists(image_path):
|
| return {
|
| 'success': False,
|
| 'error': f"Image not found: {image_path}",
|
| 'model_used': None,
|
| 'response': None
|
| }
|
|
|
| self._log(f"📸 Processing {image_path}...", force=True)
|
|
|
|
|
| try:
|
| base64_image = self.encode_image(image_path)
|
| except Exception as e:
|
| return {
|
| 'success': False,
|
| 'error': f"Error encoding image: {e}",
|
| 'model_used': None,
|
| 'response': None
|
| }
|
|
|
|
|
| nvidia_models = self.config.get('nvidia_models', [])
|
| for model_name in nvidia_models:
|
| success, response = self._try_nvidia_model(model_name, base64_image, prompt)
|
| if success:
|
| return {
|
| 'success': True,
|
| 'error': None,
|
| 'model_used': model_name,
|
| 'provider': 'NVIDIA',
|
| 'response': response
|
| }
|
|
|
|
|
| if self.config.get('enable_gemini_fallback', True):
|
| self._log(f"\n⚠️ All NVIDIA models failed. Trying Gemini fallback...", force=True)
|
| gemini_models = self.config.get('gemini_models', [])
|
| for model_name in gemini_models:
|
| success, response = self._try_gemini_model(model_name, base64_image, prompt)
|
| if success:
|
| return {
|
| 'success': True,
|
| 'error': None,
|
| 'model_used': model_name,
|
| 'provider': 'Gemini',
|
| 'response': response
|
| }
|
|
|
|
|
| return {
|
| 'success': False,
|
| 'error': 'All models failed to analyze the image',
|
| 'model_used': None,
|
| 'provider': None,
|
| 'response': None,
|
| 'suggestions': [
|
| 'Check your API key validity',
|
| 'Verify your internet connection',
|
| 'Ensure the image format is supported',
|
| 'Check API service status'
|
| ]
|
| }
|
|
|
|
|
| def main():
|
| """Main function for standalone testing."""
|
| IMAGE_PATH = "image.png"
|
|
|
|
|
| analyzer = ImageAnalyzer()
|
|
|
|
|
| result = analyzer.analyze_image(IMAGE_PATH)
|
|
|
|
|
| print(f"\n{'='*60}")
|
| print(f"📊 ANALYSIS RESULTS")
|
| print(f"{'='*60}")
|
| print(f"Success: {result['success']}")
|
| if result['success']:
|
| print(f"Provider: {result['provider']}")
|
| print(f"Model Used: {result['model_used']}")
|
| print(f"\n✅ Image analysis completed successfully!")
|
| else:
|
| print(f"Error: {result['error']}")
|
| if 'suggestions' in result:
|
| print(f"\n💡 Suggestions:")
|
| for suggestion in result['suggestions']:
|
| print(f" - {suggestion}")
|
|
|
|
|
| if __name__ == "__main__":
|
| main()
|
|
|