Spaces:
Running
Running
File size: 8,700 Bytes
48e95f0 59ab782 d576d75 a1011ad 3edbe0b 48e95f0 38c555b c43a7a2 7d533c9 d576d75 9b92ec5 48e95f0 d576d75 48e95f0 7e333d7 59ab782 718887f f21d678 7d533c9 e18b17c f21d678 7e333d7 48e95f0 38c555b 689e789 38c555b 42b2d96 7e333d7 48e95f0 7e333d7 d10bf47 9b92ec5 7e333d7 9b92ec5 7e333d7 48e95f0 7e333d7 9b92ec5 7e333d7 d10bf47 9b92ec5 7e333d7 59ab782 48e95f0 3edbe0b 59ab782 48e95f0 59ab782 48e95f0 59ab782 48e95f0 59ab782 48e95f0 59ab782 5c8f5bf 59ab782 5c8f5bf 59ab782 9b92ec5 d5e35e8 59ab782 9b92ec5 59ab782 48e95f0 59ab782 9b92ec5 3edbe0b e18b17c c43a7a2 e18b17c c43a7a2 e18b17c c43a7a2 e18b17c c43a7a2 e18b17c c43a7a2 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 |
import asyncio
import os
from datetime import datetime
import uuid
from fastapi import APIRouter, Depends, HTTPException, UploadFile, File
from fastapi.responses import JSONResponse, FileResponse
import pytz
from sqlalchemy.orm import Session
from typing import List, Dict, Any
from db.models import Marker, Product, User, Ingredient
from interfaces.ingredientModels import IngredientAnalysisResult, IngredientRequest
from interfaces.productModels import ProductIngredientsRequest
from logger_manager import log_info, log_error
from db.database import get_db,SessionLocal
from db.repositories import IngredientRepository
from dotenv import load_dotenv
from langsmith import traceable
from services.ingredientFinderAgent import IngredientInfoAgentLangGraph
from services.productAnalyzerAgent import analyze_product_ingredients
from services.auth_service import get_current_user
from utils.db_utils import ingredient_db_to_pydantic
from interfaces.productModels import ProductAnalysisResponse
from services.analysis_service import get_product_data_by_marker_id as get_analysis_service_data
from utils.ingredient_utils import process_single_ingredient
# Load environment variables
load_dotenv()
# Get rate limit from environment variable or use default
PARALLEL_RATE_LIMIT = int(os.getenv("PARALLEL_RATE_LIMIT", 10))
log_info(f"Using parallel rate limit of {PARALLEL_RATE_LIMIT}")
# Create a semaphore to limit concurrent API calls
llm_semaphore = asyncio.Semaphore(PARALLEL_RATE_LIMIT)
router = APIRouter()
# process single ingredient
@router.post("/process_ingredient", response_model=IngredientAnalysisResult)
@traceable
async def process_ingredient_endpoint(request: IngredientRequest, db: Session = Depends(get_db)):
try:
log_info(f"Received request to process ingredient: {request.name}")
# Check if we already have this ingredient in the database
repo = IngredientRepository(db)
db_ingredient = repo.get_ingredient_by_name(request.name)
if db_ingredient:
log_info(f"Found existing ingredient in database: {request.name}")
# Convert DB model to Pydantic model
# (This would need a function to correctly map the data)
return ingredient_db_to_pydantic(db_ingredient)
# If not in database, get from agent
ingredient_finder = IngredientInfoAgentLangGraph()
# run async function if found event loop already running use normal function
try:
result = await ingredient_finder.process_ingredient_async(request.name)
except RuntimeError:
# If the event loop is not running, run the function normally
result = ingredient_finder.process_ingredient(request.name)
# Save to database
repo.create_ingredient(result)
log_info(f"Saved new ingredient to database: {request.name}")
return result
except Exception as e:
log_error(f"Error processing ingredient: {e}",e)
raise HTTPException(status_code=500, detail="Internal Server Error")
@router.post("/process_product_ingredients", response_model=Dict[str, Any])
@traceable
async def process_ingredients_endpoint(product_ingredient: ProductIngredientsRequest, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
log_info(f"process_ingredients_endpoint called for {len(product_ingredient.ingredients)} ingredients")
ingredients = product_ingredient.ingredients
try:
# Step 1: Process individual ingredients
ingredient_results = []
log_info(f"Starting parallel ingredient processing with rate limit {PARALLEL_RATE_LIMIT}")
# Create tasks for parallel processing
tasks = []
for ingredient_name in ingredients:
task = process_single_ingredient(ingredient_name)
tasks.append(task)
# Execute tasks concurrently with rate limiting
ingredient_results = await asyncio.gather(*tasks)
log_info(f"Completed parallel processing of {len(ingredient_results)} ingredients")
# Step 2: Generate aggregate analysis with product analyzer agent
# Safely get user preferences, handling the case where the preferences table doesn't exist
user_preferences = {}
if current_user:
user_preferences["user_id"] = current_user.id
try:
# Only try to access preferences if the relationship exists
if hasattr(current_user, 'preferences') and current_user.preferences:
user_preferences["allergies"] = current_user.preferences[0].allergens
user_preferences["dietary_restrictions"] = current_user.preferences[0].dietary_restrictions
else:
user_preferences["allergies"] = None
user_preferences["dietary_restrictions"] = None
except Exception as e:
log_error(f"Error accessing user preferences: {e}", e)
user_preferences["allergies"] = None
user_preferences["dietary_restrictions"] = None
product_analysis = await analyze_product_ingredients(
ingredients_data=ingredient_results,
user_preferences=user_preferences
)
# print("Product analysis result:", product_analysis)
# Step 3: Prepare final response
result = {
"ingredients_count": len(ingredients),
"processed_ingredients": ingredient_results,
"ingredient_ids": product_analysis["ingredient_ids"],
"overall_analysis": product_analysis,
"user_id": current_user.id if current_user else None,
"timestamp": datetime.now(tz=pytz.timezone('Asia/Kolkata')).isoformat()
}
log_info("process_ingredients_endpoint completed successfully")
return result
except Exception as e:
log_error(f"Error in process_ingredients_endpoint: {str(e)}",e)
raise HTTPException(status_code=500, detail="Internal Server Error")
@router.get("/get_by_marker_id/{target_id}", response_model=None)
async def get_analysis_by_marker_id(target_id: str, db: Session = Depends(get_db)):
"""
Retrieves product analysis and ingredient information by marker ID.
"""
log_info(f"Received request for analysis by marker ID: {target_id}")
try:
# Check if marker exists
marker = db.query(Marker).filter(Marker.vuforia_id == target_id).first()
if not marker:
return JSONResponse(
status_code=404,
content={
"found": False,
"message": f"Marker with ID {target_id} not found",
"timestamp": datetime.now(tz=pytz.timezone('Asia/Kolkata')).isoformat()
}
)
# Check if product exists
product = db.query(Product).filter(Product.id == marker.product_id).first()
if not product:
return JSONResponse(
status_code=404,
content={
"found": False,
"message": f"Product not found for marker ID: {target_id}",
"timestamp": datetime.now(tz=pytz.timezone('Asia/Kolkata')).isoformat()
}
)
# Try to get complete product data
product_data = get_analysis_service_data(db, target_id)
# If complete data retrieval fails, return minimal data
if not product_data:
return JSONResponse(
status_code=200,
content={
"found": True,
"basic_info": {
"product_id": str(product.id),
"product_name": getattr(product, 'product_name', 'Unknown Product'),
},
"message": "Product found but analysis data could not be processed",
"timestamp": datetime.now(tz=pytz.timezone('Asia/Kolkata')).isoformat()
}
)
return product_data
except Exception as e:
log_error(f"Error in get_analysis_by_marker_id: {str(e)}", e)
return JSONResponse(
status_code=500,
content={
"found": False,
"error": str(e),
"message": "Error processing request",
"timestamp": datetime.now(tz=pytz.timezone('Asia/Kolkata')).isoformat()
}
)
|