Spaces:
Sleeping
Sleeping
File size: 6,638 Bytes
af2bcb1 242f216 af2bcb1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 |
import openai
import streamlit as st
import json
from typing import Dict, Optional
from config import get_openai_api_key, ANALYSIS_PROMPT_TEMPLATE, AI_MODEL
def analyze_shelf_image(image_base64: str, product_name: str, analysis_depth: str = "Podstawowy") -> Optional[Dict]:
"""
Analyze shelf image using OpenAI GPT-4 Vision API
Args:
image_base64: Base64 encoded image
product_name: Name of product to search for
analysis_depth: Level of analysis detail
Returns:
Dictionary with analysis results or None if failed
"""
try:
# Get API key and validate
api_key = get_openai_api_key()
if not api_key:
st.error("⚠️ OpenAI API key not configured!")
return None
# Initialize OpenAI client
client = openai.OpenAI(api_key=api_key)
# Prepare the prompt
prompt = ANALYSIS_PROMPT_TEMPLATE.format(
product_name=product_name,
analysis_depth=analysis_depth
)
# Make API call
response = client.chat.completions.create(
model=AI_MODEL,
messages=[
{
"role": "user",
"content": [
{
"type": "text",
"text": prompt
},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{image_base64}",
"detail": "high"
}
}
]
}
],
max_completion_tokens=1500
)
# Parse response
content = response.choices[0].message.content
# Try to extract JSON from response
try:
# Find JSON in the response
start_idx = content.find('{')
end_idx = content.rfind('}') + 1
if start_idx != -1 and end_idx != -1:
json_str = content[start_idx:end_idx]
analysis_results = json.loads(json_str)
# Validate required fields
required_fields = ['product_found', 'overall_score', 'confidence']
for field in required_fields:
if field not in analysis_results:
st.warning(f"Missing required field: {field}")
analysis_results[field] = get_default_value(field)
# Ensure proper data types
analysis_results = normalize_analysis_results(analysis_results)
return analysis_results
else:
st.error("Could not find JSON in AI response")
return create_fallback_result(product_name, content)
except json.JSONDecodeError as e:
st.error(f"Failed to parse AI response as JSON: {str(e)}")
return create_fallback_result(product_name, content)
except Exception as e:
st.error(f"Error during AI analysis: {str(e)}")
return None
def normalize_analysis_results(results: Dict) -> Dict:
"""Normalize analysis results to ensure proper data types"""
try:
# Ensure boolean fields
bool_fields = ['product_found', 'price_visible']
for field in bool_fields:
if field in results:
if isinstance(results[field], str):
results[field] = results[field].lower() in ['true', '1', 'yes', 'tak']
else:
results[field] = bool(results[field])
# Ensure numeric fields
if 'overall_score' in results:
try:
results['overall_score'] = max(1, min(10, int(float(results['overall_score']))))
except:
results['overall_score'] = 5
if 'confidence' in results:
try:
results['confidence'] = max(0.0, min(1.0, float(results['confidence'])))
except:
results['confidence'] = 0.5
if 'facing_count' in results:
try:
results['facing_count'] = max(0, int(results['facing_count']))
except:
results['facing_count'] = 0
if 'shelf_share' in results:
try:
results['shelf_share'] = max(0, min(100, float(results['shelf_share'])))
except:
results['shelf_share'] = 0
# Ensure string fields
string_fields = ['shelf_position', 'product_condition', 'description']
for field in string_fields:
if field in results and not isinstance(results[field], str):
results[field] = str(results[field])
# Ensure list fields
if 'competitors_nearby' in results and not isinstance(results['competitors_nearby'], list):
results['competitors_nearby'] = []
return results
except Exception as e:
st.warning(f"Error normalizing results: {str(e)}")
return results
def get_default_value(field: str):
"""Get default value for missing field"""
defaults = {
'product_found': False,
'facing_count': 0,
'shelf_position': 'unknown',
'price_visible': False,
'product_condition': 'unknown',
'overall_score': 5,
'confidence': 0.5,
'description': 'Analysis incomplete',
'competitors_nearby': [],
'shelf_share': 0
}
return defaults.get(field, None)
def create_fallback_result(product_name: str, ai_response: str) -> Dict:
"""Create fallback result when JSON parsing fails"""
# Try to extract basic information from text response
product_found = any(word in ai_response.lower() for word in ['found', 'visible', 'present', 'znaleziono', 'widoczny'])
return {
'product_found': product_found,
'facing_count': 1 if product_found else 0,
'shelf_position': 'unknown',
'price_visible': 'cena' in ai_response.lower() or 'price' in ai_response.lower(),
'product_condition': 'good',
'overall_score': 6 if product_found else 3,
'confidence': 0.6,
'description': f"Analysis of {product_name}: {ai_response[:200]}...",
'competitors_nearby': [],
'shelf_share': 10 if product_found else 0
} |