Spaces:
Running
Running
| import asyncio | |
| import os | |
| from datetime import datetime | |
| import uuid | |
| from fastapi import APIRouter, Depends, HTTPException, UploadFile, File | |
| from fastapi.responses import JSONResponse, FileResponse | |
| import pytz | |
| from sqlalchemy.orm import Session | |
| from typing import List, Dict, Any | |
| from db.models import Marker, Product, User, Ingredient | |
| from interfaces.ingredientModels import IngredientAnalysisResult, IngredientRequest | |
| from interfaces.productModels import ProductIngredientsRequest | |
| from logger_manager import log_info, log_error | |
| from db.database import get_db,SessionLocal | |
| from db.repositories import IngredientRepository | |
| from dotenv import load_dotenv | |
| from langsmith import traceable | |
| from services.ingredientFinderAgent import IngredientInfoAgentLangGraph | |
| from services.productAnalyzerAgent import analyze_product_ingredients | |
| from services.auth_service import get_current_user | |
| from utils.db_utils import ingredient_db_to_pydantic | |
| from interfaces.productModels import ProductAnalysisResponse | |
| from services.analysis_service import get_product_data_by_marker_id as get_analysis_service_data | |
| from utils.ingredient_utils import process_single_ingredient | |
| # Load environment variables | |
| load_dotenv() | |
| # Get rate limit from environment variable or use default | |
| PARALLEL_RATE_LIMIT = int(os.getenv("PARALLEL_RATE_LIMIT", 10)) | |
| log_info(f"Using parallel rate limit of {PARALLEL_RATE_LIMIT}") | |
| # Create a semaphore to limit concurrent API calls | |
| llm_semaphore = asyncio.Semaphore(PARALLEL_RATE_LIMIT) | |
| router = APIRouter() | |
| # process single ingredient | |
| async def process_ingredient_endpoint(request: IngredientRequest, db: Session = Depends(get_db)): | |
| try: | |
| log_info(f"Received request to process ingredient: {request.name}") | |
| # Check if we already have this ingredient in the database | |
| repo = IngredientRepository(db) | |
| db_ingredient = repo.get_ingredient_by_name(request.name) | |
| if db_ingredient: | |
| log_info(f"Found existing ingredient in database: {request.name}") | |
| # Convert DB model to Pydantic model | |
| # (This would need a function to correctly map the data) | |
| return ingredient_db_to_pydantic(db_ingredient) | |
| # If not in database, get from agent | |
| ingredient_finder = IngredientInfoAgentLangGraph() | |
| # run async function if found event loop already running use normal function | |
| try: | |
| result = await ingredient_finder.process_ingredient_async(request.name) | |
| except RuntimeError: | |
| # If the event loop is not running, run the function normally | |
| result = ingredient_finder.process_ingredient(request.name) | |
| # Save to database | |
| repo.create_ingredient(result) | |
| log_info(f"Saved new ingredient to database: {request.name}") | |
| return result | |
| except Exception as e: | |
| log_error(f"Error processing ingredient: {e}",e) | |
| raise HTTPException(status_code=500, detail="Internal Server Error") | |
| async def process_ingredients_endpoint(product_ingredient: ProductIngredientsRequest, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)): | |
| log_info(f"process_ingredients_endpoint called for {len(product_ingredient.ingredients)} ingredients") | |
| ingredients = product_ingredient.ingredients | |
| try: | |
| # Step 1: Process individual ingredients | |
| ingredient_results = [] | |
| log_info(f"Starting parallel ingredient processing with rate limit {PARALLEL_RATE_LIMIT}") | |
| # Create tasks for parallel processing | |
| tasks = [] | |
| for ingredient_name in ingredients: | |
| task = process_single_ingredient(ingredient_name) | |
| tasks.append(task) | |
| # Execute tasks concurrently with rate limiting | |
| ingredient_results = await asyncio.gather(*tasks) | |
| log_info(f"Completed parallel processing of {len(ingredient_results)} ingredients") | |
| # Step 2: Generate aggregate analysis with product analyzer agent | |
| # Safely get user preferences, handling the case where the preferences table doesn't exist | |
| user_preferences = {} | |
| if current_user: | |
| user_preferences["user_id"] = current_user.id | |
| try: | |
| # Only try to access preferences if the relationship exists | |
| if hasattr(current_user, 'preferences') and current_user.preferences: | |
| user_preferences["allergies"] = current_user.preferences[0].allergens | |
| user_preferences["dietary_restrictions"] = current_user.preferences[0].dietary_restrictions | |
| else: | |
| user_preferences["allergies"] = None | |
| user_preferences["dietary_restrictions"] = None | |
| except Exception as e: | |
| log_error(f"Error accessing user preferences: {e}", e) | |
| user_preferences["allergies"] = None | |
| user_preferences["dietary_restrictions"] = None | |
| product_analysis = await analyze_product_ingredients( | |
| ingredients_data=ingredient_results, | |
| user_preferences=user_preferences | |
| ) | |
| # print("Product analysis result:", product_analysis) | |
| # Step 3: Prepare final response | |
| result = { | |
| "ingredients_count": len(ingredients), | |
| "processed_ingredients": ingredient_results, | |
| "ingredient_ids": product_analysis["ingredient_ids"], | |
| "overall_analysis": product_analysis, | |
| "user_id": current_user.id if current_user else None, | |
| "timestamp": datetime.now(tz=pytz.timezone('Asia/Kolkata')).isoformat() | |
| } | |
| log_info("process_ingredients_endpoint completed successfully") | |
| return result | |
| except Exception as e: | |
| log_error(f"Error in process_ingredients_endpoint: {str(e)}",e) | |
| raise HTTPException(status_code=500, detail="Internal Server Error") | |
| async def get_analysis_by_marker_id(target_id: str, db: Session = Depends(get_db)): | |
| """ | |
| Retrieves product analysis and ingredient information by marker ID. | |
| """ | |
| log_info(f"Received request for analysis by marker ID: {target_id}") | |
| try: | |
| # Check if marker exists | |
| marker = db.query(Marker).filter(Marker.vuforia_id == target_id).first() | |
| if not marker: | |
| return JSONResponse( | |
| status_code=404, | |
| content={ | |
| "found": False, | |
| "message": f"Marker with ID {target_id} not found", | |
| "timestamp": datetime.now(tz=pytz.timezone('Asia/Kolkata')).isoformat() | |
| } | |
| ) | |
| # Check if product exists | |
| product = db.query(Product).filter(Product.id == marker.product_id).first() | |
| if not product: | |
| return JSONResponse( | |
| status_code=404, | |
| content={ | |
| "found": False, | |
| "message": f"Product not found for marker ID: {target_id}", | |
| "timestamp": datetime.now(tz=pytz.timezone('Asia/Kolkata')).isoformat() | |
| } | |
| ) | |
| # Try to get complete product data | |
| product_data = get_analysis_service_data(db, target_id) | |
| # If complete data retrieval fails, return minimal data | |
| if not product_data: | |
| return JSONResponse( | |
| status_code=200, | |
| content={ | |
| "found": True, | |
| "basic_info": { | |
| "product_id": str(product.id), | |
| "product_name": getattr(product, 'product_name', 'Unknown Product'), | |
| }, | |
| "message": "Product found but analysis data could not be processed", | |
| "timestamp": datetime.now(tz=pytz.timezone('Asia/Kolkata')).isoformat() | |
| } | |
| ) | |
| return product_data | |
| except Exception as e: | |
| log_error(f"Error in get_analysis_by_marker_id: {str(e)}", e) | |
| return JSONResponse( | |
| status_code=500, | |
| content={ | |
| "found": False, | |
| "error": str(e), | |
| "message": "Error processing request", | |
| "timestamp": datetime.now(tz=pytz.timezone('Asia/Kolkata')).isoformat() | |
| } | |
| ) | |