import asyncio import os from datetime import datetime import uuid from fastapi import APIRouter, Depends, HTTPException, UploadFile, File from fastapi.responses import JSONResponse, FileResponse import pytz from sqlalchemy.orm import Session from typing import List, Dict, Any from db.models import Marker, Product, User, Ingredient from interfaces.ingredientModels import IngredientAnalysisResult, IngredientRequest from interfaces.productModels import ProductIngredientsRequest from logger_manager import log_info, log_error from db.database import get_db,SessionLocal from db.repositories import IngredientRepository from dotenv import load_dotenv from langsmith import traceable from services.ingredientFinderAgent import IngredientInfoAgentLangGraph from services.productAnalyzerAgent import analyze_product_ingredients from services.auth_service import get_current_user from utils.db_utils import ingredient_db_to_pydantic from interfaces.productModels import ProductAnalysisResponse from services.analysis_service import get_product_data_by_marker_id as get_analysis_service_data from utils.ingredient_utils import process_single_ingredient # Load environment variables load_dotenv() # Get rate limit from environment variable or use default PARALLEL_RATE_LIMIT = int(os.getenv("PARALLEL_RATE_LIMIT", 10)) log_info(f"Using parallel rate limit of {PARALLEL_RATE_LIMIT}") # Create a semaphore to limit concurrent API calls llm_semaphore = asyncio.Semaphore(PARALLEL_RATE_LIMIT) router = APIRouter() # process single ingredient @router.post("/process_ingredient", response_model=IngredientAnalysisResult) @traceable async def process_ingredient_endpoint(request: IngredientRequest, db: Session = Depends(get_db)): try: log_info(f"Received request to process ingredient: {request.name}") # Check if we already have this ingredient in the database repo = IngredientRepository(db) db_ingredient = repo.get_ingredient_by_name(request.name) if db_ingredient: log_info(f"Found existing ingredient in database: {request.name}") # Convert DB model to Pydantic model # (This would need a function to correctly map the data) return ingredient_db_to_pydantic(db_ingredient) # If not in database, get from agent ingredient_finder = IngredientInfoAgentLangGraph() # run async function if found event loop already running use normal function try: result = await ingredient_finder.process_ingredient_async(request.name) except RuntimeError: # If the event loop is not running, run the function normally result = ingredient_finder.process_ingredient(request.name) # Save to database repo.create_ingredient(result) log_info(f"Saved new ingredient to database: {request.name}") return result except Exception as e: log_error(f"Error processing ingredient: {e}",e) raise HTTPException(status_code=500, detail="Internal Server Error") @router.post("/process_product_ingredients", response_model=Dict[str, Any]) @traceable async def process_ingredients_endpoint(product_ingredient: ProductIngredientsRequest, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)): log_info(f"process_ingredients_endpoint called for {len(product_ingredient.ingredients)} ingredients") ingredients = product_ingredient.ingredients try: # Step 1: Process individual ingredients ingredient_results = [] log_info(f"Starting parallel ingredient processing with rate limit {PARALLEL_RATE_LIMIT}") # Create tasks for parallel processing tasks = [] for ingredient_name in ingredients: task = process_single_ingredient(ingredient_name) tasks.append(task) # Execute tasks concurrently with rate limiting ingredient_results = await asyncio.gather(*tasks) log_info(f"Completed parallel processing of {len(ingredient_results)} ingredients") # Step 2: Generate aggregate analysis with product analyzer agent # Safely get user preferences, handling the case where the preferences table doesn't exist user_preferences = {} if current_user: user_preferences["user_id"] = current_user.id try: # Only try to access preferences if the relationship exists if hasattr(current_user, 'preferences') and current_user.preferences: user_preferences["allergies"] = current_user.preferences[0].allergens user_preferences["dietary_restrictions"] = current_user.preferences[0].dietary_restrictions else: user_preferences["allergies"] = None user_preferences["dietary_restrictions"] = None except Exception as e: log_error(f"Error accessing user preferences: {e}", e) user_preferences["allergies"] = None user_preferences["dietary_restrictions"] = None product_analysis = await analyze_product_ingredients( ingredients_data=ingredient_results, user_preferences=user_preferences ) # print("Product analysis result:", product_analysis) # Step 3: Prepare final response result = { "ingredients_count": len(ingredients), "processed_ingredients": ingredient_results, "ingredient_ids": product_analysis["ingredient_ids"], "overall_analysis": product_analysis, "user_id": current_user.id if current_user else None, "timestamp": datetime.now(tz=pytz.timezone('Asia/Kolkata')).isoformat() } log_info("process_ingredients_endpoint completed successfully") return result except Exception as e: log_error(f"Error in process_ingredients_endpoint: {str(e)}",e) raise HTTPException(status_code=500, detail="Internal Server Error") @router.get("/get_by_marker_id/{target_id}", response_model=None) async def get_analysis_by_marker_id(target_id: str, db: Session = Depends(get_db)): """ Retrieves product analysis and ingredient information by marker ID. """ log_info(f"Received request for analysis by marker ID: {target_id}") try: # Check if marker exists marker = db.query(Marker).filter(Marker.vuforia_id == target_id).first() if not marker: return JSONResponse( status_code=404, content={ "found": False, "message": f"Marker with ID {target_id} not found", "timestamp": datetime.now(tz=pytz.timezone('Asia/Kolkata')).isoformat() } ) # Check if product exists product = db.query(Product).filter(Product.id == marker.product_id).first() if not product: return JSONResponse( status_code=404, content={ "found": False, "message": f"Product not found for marker ID: {target_id}", "timestamp": datetime.now(tz=pytz.timezone('Asia/Kolkata')).isoformat() } ) # Try to get complete product data product_data = get_analysis_service_data(db, target_id) # If complete data retrieval fails, return minimal data if not product_data: return JSONResponse( status_code=200, content={ "found": True, "basic_info": { "product_id": str(product.id), "product_name": getattr(product, 'product_name', 'Unknown Product'), }, "message": "Product found but analysis data could not be processed", "timestamp": datetime.now(tz=pytz.timezone('Asia/Kolkata')).isoformat() } ) return product_data except Exception as e: log_error(f"Error in get_analysis_by_marker_id: {str(e)}", e) return JSONResponse( status_code=500, content={ "found": False, "error": str(e), "message": "Error processing request", "timestamp": datetime.now(tz=pytz.timezone('Asia/Kolkata')).isoformat() } )