Spaces:
Running
Running
File size: 2,155 Bytes
8986db1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
import asyncio
from datetime import datetime
import pytz
from typing import List, Dict, Any
from logger_manager import log_info, log_error
from services.productAnalyzerAgent import analyze_product_ingredients
from utils.ingredient_utils import process_single_ingredient
# Load environment variables
from env import PARALLEL_RATE_LIMIT
log_info(f"Using parallel rate limit of {PARALLEL_RATE_LIMIT}")
# Create a semaphore to limit concurrent API calls
llm_semaphore = asyncio.Semaphore(PARALLEL_RATE_LIMIT)
async def process_product_ingredients(product_ingredients: List[str]) -> Dict[str, Any]:
log_info(f"process_product_ingredients called for {len(product_ingredients)} ingredients")
try:
# Step 1: Process individual ingredients
ingredient_results = []
log_info(f"Starting parallel ingredient processing with rate limit {PARALLEL_RATE_LIMIT}")
# Create tasks for parallel processing
tasks = []
for ingredient_name in product_ingredients:
task = process_single_ingredient(ingredient_name)
tasks.append(task)
# Execute tasks concurrently with rate limiting
ingredient_results = await asyncio.gather(*tasks)
log_info(f"Completed parallel processing of {len(ingredient_results)} ingredients")
product_analysis = await analyze_product_ingredients(
ingredients_data=ingredient_results
)
# print("Product analysis result:", product_analysis)
# Step 3: Prepare final response
result = {
"ingredients_count": len(product_ingredients),
"processed_ingredients": ingredient_results,
"ingredient_ids": product_analysis["ingredient_ids"],
"overall_analysis": product_analysis,
"timestamp": datetime.now(tz=pytz.timezone('Asia/Kolkata')).isoformat()
}
log_info("process_product_ingredients completed successfully")
return result
except Exception as e:
log_error(f"Error in process_product_ingredients: {str(e)}",e)
return None
|