File size: 2,155 Bytes
8986db1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import asyncio
from datetime import datetime
import pytz
from typing import List, Dict, Any
from logger_manager import log_info, log_error
from services.productAnalyzerAgent import analyze_product_ingredients
from utils.ingredient_utils import process_single_ingredient

# Load environment variables
from env import PARALLEL_RATE_LIMIT

log_info(f"Using parallel rate limit of {PARALLEL_RATE_LIMIT}")

# Create a semaphore to limit concurrent API calls
llm_semaphore = asyncio.Semaphore(PARALLEL_RATE_LIMIT)


async def process_product_ingredients(product_ingredients: List[str]) -> Dict[str, Any]:    
    log_info(f"process_product_ingredients called for {len(product_ingredients)} ingredients")
    try:
        # Step 1: Process individual ingredients
        ingredient_results = []
            
        log_info(f"Starting parallel ingredient processing with rate limit {PARALLEL_RATE_LIMIT}")
        
        # Create tasks for parallel processing
        tasks = []
        for ingredient_name in product_ingredients:
            task = process_single_ingredient(ingredient_name)
            tasks.append(task)
        
        # Execute tasks concurrently with rate limiting
        ingredient_results = await asyncio.gather(*tasks)
        log_info(f"Completed parallel processing of {len(ingredient_results)} ingredients")
            
        product_analysis = await analyze_product_ingredients(
            ingredients_data=ingredient_results
        )
        
        # print("Product analysis result:", product_analysis)
        
        # Step 3: Prepare final response
        result = {
            "ingredients_count": len(product_ingredients),
            "processed_ingredients": ingredient_results,
            "ingredient_ids": product_analysis["ingredient_ids"],
            "overall_analysis": product_analysis,
            "timestamp": datetime.now(tz=pytz.timezone('Asia/Kolkata')).isoformat()
        }

        log_info("process_product_ingredients completed successfully")
        return result
        
    except Exception as e:
        log_error(f"Error in process_product_ingredients: {str(e)}",e)
        return None