Spaces:
Sleeping
Sleeping
| import openai | |
| import streamlit as st | |
| import json | |
| from typing import Dict, Optional | |
| from config import get_openai_api_key, ANALYSIS_PROMPT_TEMPLATE, AI_MODEL | |
| def analyze_shelf_image(image_base64: str, product_name: str, analysis_depth: str = "Podstawowy") -> Optional[Dict]: | |
| """ | |
| Analyze shelf image using OpenAI GPT-4 Vision API | |
| Args: | |
| image_base64: Base64 encoded image | |
| product_name: Name of product to search for | |
| analysis_depth: Level of analysis detail | |
| Returns: | |
| Dictionary with analysis results or None if failed | |
| """ | |
| try: | |
| # Get API key and validate | |
| api_key = get_openai_api_key() | |
| if not api_key: | |
| st.error("⚠️ OpenAI API key not configured!") | |
| return None | |
| # Initialize OpenAI client | |
| client = openai.OpenAI(api_key=api_key) | |
| # Prepare the prompt | |
| prompt = ANALYSIS_PROMPT_TEMPLATE.format( | |
| product_name=product_name, | |
| analysis_depth=analysis_depth | |
| ) | |
| # Make API call | |
| response = client.chat.completions.create( | |
| model=AI_MODEL, | |
| messages=[ | |
| { | |
| "role": "user", | |
| "content": [ | |
| { | |
| "type": "text", | |
| "text": prompt | |
| }, | |
| { | |
| "type": "image_url", | |
| "image_url": { | |
| "url": f"data:image/jpeg;base64,{image_base64}", | |
| "detail": "high" | |
| } | |
| } | |
| ] | |
| } | |
| ], | |
| max_completion_tokens=1500 | |
| ) | |
| # Parse response | |
| content = response.choices[0].message.content | |
| # Try to extract JSON from response | |
| try: | |
| # Find JSON in the response | |
| start_idx = content.find('{') | |
| end_idx = content.rfind('}') + 1 | |
| if start_idx != -1 and end_idx != -1: | |
| json_str = content[start_idx:end_idx] | |
| analysis_results = json.loads(json_str) | |
| # Validate required fields | |
| required_fields = ['product_found', 'overall_score', 'confidence'] | |
| for field in required_fields: | |
| if field not in analysis_results: | |
| st.warning(f"Missing required field: {field}") | |
| analysis_results[field] = get_default_value(field) | |
| # Ensure proper data types | |
| analysis_results = normalize_analysis_results(analysis_results) | |
| return analysis_results | |
| else: | |
| st.error("Could not find JSON in AI response") | |
| return create_fallback_result(product_name, content) | |
| except json.JSONDecodeError as e: | |
| st.error(f"Failed to parse AI response as JSON: {str(e)}") | |
| return create_fallback_result(product_name, content) | |
| except Exception as e: | |
| st.error(f"Error during AI analysis: {str(e)}") | |
| return None | |
| def normalize_analysis_results(results: Dict) -> Dict: | |
| """Normalize analysis results to ensure proper data types""" | |
| try: | |
| # Ensure boolean fields | |
| bool_fields = ['product_found', 'price_visible'] | |
| for field in bool_fields: | |
| if field in results: | |
| if isinstance(results[field], str): | |
| results[field] = results[field].lower() in ['true', '1', 'yes', 'tak'] | |
| else: | |
| results[field] = bool(results[field]) | |
| # Ensure numeric fields | |
| if 'overall_score' in results: | |
| try: | |
| results['overall_score'] = max(1, min(10, int(float(results['overall_score'])))) | |
| except: | |
| results['overall_score'] = 5 | |
| if 'confidence' in results: | |
| try: | |
| results['confidence'] = max(0.0, min(1.0, float(results['confidence']))) | |
| except: | |
| results['confidence'] = 0.5 | |
| if 'facing_count' in results: | |
| try: | |
| results['facing_count'] = max(0, int(results['facing_count'])) | |
| except: | |
| results['facing_count'] = 0 | |
| if 'shelf_share' in results: | |
| try: | |
| results['shelf_share'] = max(0, min(100, float(results['shelf_share']))) | |
| except: | |
| results['shelf_share'] = 0 | |
| # Ensure string fields | |
| string_fields = ['shelf_position', 'product_condition', 'description'] | |
| for field in string_fields: | |
| if field in results and not isinstance(results[field], str): | |
| results[field] = str(results[field]) | |
| # Ensure list fields | |
| if 'competitors_nearby' in results and not isinstance(results['competitors_nearby'], list): | |
| results['competitors_nearby'] = [] | |
| return results | |
| except Exception as e: | |
| st.warning(f"Error normalizing results: {str(e)}") | |
| return results | |
| def get_default_value(field: str): | |
| """Get default value for missing field""" | |
| defaults = { | |
| 'product_found': False, | |
| 'facing_count': 0, | |
| 'shelf_position': 'unknown', | |
| 'price_visible': False, | |
| 'product_condition': 'unknown', | |
| 'overall_score': 5, | |
| 'confidence': 0.5, | |
| 'description': 'Analysis incomplete', | |
| 'competitors_nearby': [], | |
| 'shelf_share': 0 | |
| } | |
| return defaults.get(field, None) | |
| def create_fallback_result(product_name: str, ai_response: str) -> Dict: | |
| """Create fallback result when JSON parsing fails""" | |
| # Try to extract basic information from text response | |
| product_found = any(word in ai_response.lower() for word in ['found', 'visible', 'present', 'znaleziono', 'widoczny']) | |
| return { | |
| 'product_found': product_found, | |
| 'facing_count': 1 if product_found else 0, | |
| 'shelf_position': 'unknown', | |
| 'price_visible': 'cena' in ai_response.lower() or 'price' in ai_response.lower(), | |
| 'product_condition': 'good', | |
| 'overall_score': 6 if product_found else 3, | |
| 'confidence': 0.6, | |
| 'description': f"Analysis of {product_name}: {ai_response[:200]}...", | |
| 'competitors_nearby': [], | |
| 'shelf_share': 10 if product_found else 0 | |
| } |