File size: 8,700 Bytes
48e95f0
 
59ab782
d576d75
a1011ad
3edbe0b
48e95f0
38c555b
 
c43a7a2
7d533c9
d576d75
9b92ec5
48e95f0
d576d75
48e95f0
 
7e333d7
59ab782
718887f
f21d678
7d533c9
e18b17c
f21d678
7e333d7
48e95f0
 
 
 
 
 
 
 
 
38c555b
689e789
38c555b
 
42b2d96
7e333d7
 
48e95f0
7e333d7
d10bf47
9b92ec5
7e333d7
 
 
 
 
 
9b92ec5
7e333d7
 
 
 
 
 
48e95f0
 
 
 
 
 
 
7e333d7
 
 
9b92ec5
7e333d7
 
d10bf47
9b92ec5
7e333d7
 
59ab782
48e95f0
3edbe0b
59ab782
 
 
 
 
48e95f0
 
59ab782
48e95f0
 
59ab782
48e95f0
 
59ab782
48e95f0
 
 
 
59ab782
 
5c8f5bf
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
59ab782
 
5c8f5bf
59ab782
 
9b92ec5
d5e35e8
59ab782
 
 
 
9b92ec5
59ab782
 
48e95f0
59ab782
 
 
 
 
 
9b92ec5
3edbe0b
e18b17c
 
c43a7a2
e18b17c
 
 
 
 
 
c43a7a2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e18b17c
c43a7a2
 
e18b17c
c43a7a2
 
 
 
 
 
 
 
 
 
 
 
 
e18b17c
 
 
c43a7a2
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
import asyncio
import os
from datetime import datetime
import uuid
from fastapi import APIRouter, Depends, HTTPException, UploadFile, File
from fastapi.responses import JSONResponse, FileResponse
import pytz
from sqlalchemy.orm import Session
from typing import List, Dict, Any
from db.models import Marker, Product, User, Ingredient
from interfaces.ingredientModels import IngredientAnalysisResult, IngredientRequest
from interfaces.productModels import ProductIngredientsRequest
from logger_manager import log_info, log_error
from db.database import get_db,SessionLocal
from db.repositories import IngredientRepository
from dotenv import load_dotenv
from langsmith import traceable
from services.ingredientFinderAgent import IngredientInfoAgentLangGraph
from services.productAnalyzerAgent import analyze_product_ingredients
from services.auth_service import get_current_user
from utils.db_utils import ingredient_db_to_pydantic
from interfaces.productModels import ProductAnalysisResponse
from services.analysis_service import get_product_data_by_marker_id as get_analysis_service_data
from utils.ingredient_utils import process_single_ingredient

# Load environment variables
load_dotenv()

# Get rate limit from environment variable or use default
PARALLEL_RATE_LIMIT = int(os.getenv("PARALLEL_RATE_LIMIT", 10))
log_info(f"Using parallel rate limit of {PARALLEL_RATE_LIMIT}")

# Create a semaphore to limit concurrent API calls
llm_semaphore = asyncio.Semaphore(PARALLEL_RATE_LIMIT)


router = APIRouter()


# process single ingredient 
@router.post("/process_ingredient", response_model=IngredientAnalysisResult)
@traceable
async def process_ingredient_endpoint(request: IngredientRequest, db: Session = Depends(get_db)):
    try:
        log_info(f"Received request to process ingredient: {request.name}")
        
        # Check if we already have this ingredient in the database
        repo = IngredientRepository(db)
        db_ingredient = repo.get_ingredient_by_name(request.name)
        
        if db_ingredient:
            log_info(f"Found existing ingredient in database: {request.name}")
            # Convert DB model to Pydantic model
            # (This would need a function to correctly map the data)
            return ingredient_db_to_pydantic(db_ingredient)
        
        # If not in database, get from agent
        ingredient_finder = IngredientInfoAgentLangGraph()
        # run async function if found event loop already running use normal function 
        try:
            result = await ingredient_finder.process_ingredient_async(request.name)
        except RuntimeError:
            # If the event loop is not running, run the function normally
            result = ingredient_finder.process_ingredient(request.name)
                    
        
        # Save to database
        repo.create_ingredient(result)
        log_info(f"Saved new ingredient to database: {request.name}")
        
        return result
    except Exception as e:
        log_error(f"Error processing ingredient: {e}",e)
        raise HTTPException(status_code=500, detail="Internal Server Error")

@router.post("/process_product_ingredients", response_model=Dict[str, Any])
@traceable
async def process_ingredients_endpoint(product_ingredient: ProductIngredientsRequest, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):    
    log_info(f"process_ingredients_endpoint called for {len(product_ingredient.ingredients)} ingredients")
    ingredients = product_ingredient.ingredients
    try:
        # Step 1: Process individual ingredients
        ingredient_results = []
            
        log_info(f"Starting parallel ingredient processing with rate limit {PARALLEL_RATE_LIMIT}")
        
        # Create tasks for parallel processing
        tasks = []
        for ingredient_name in ingredients:
            task = process_single_ingredient(ingredient_name)
            tasks.append(task)
        
        # Execute tasks concurrently with rate limiting
        ingredient_results = await asyncio.gather(*tasks)
        log_info(f"Completed parallel processing of {len(ingredient_results)} ingredients")
                
        # Step 2: Generate aggregate analysis with product analyzer agent
        
        # Safely get user preferences, handling the case where the preferences table doesn't exist
        user_preferences = {}
        if current_user:
            user_preferences["user_id"] = current_user.id
            try:
                # Only try to access preferences if the relationship exists
                if hasattr(current_user, 'preferences') and current_user.preferences:
                    user_preferences["allergies"] = current_user.preferences[0].allergens
                    user_preferences["dietary_restrictions"] = current_user.preferences[0].dietary_restrictions
                else:
                    user_preferences["allergies"] = None
                    user_preferences["dietary_restrictions"] = None
            except Exception as e:
                log_error(f"Error accessing user preferences: {e}", e)
                user_preferences["allergies"] = None
                user_preferences["dietary_restrictions"] = None
        
        product_analysis = await analyze_product_ingredients(
            ingredients_data=ingredient_results,
            user_preferences=user_preferences
        )
        
        # print("Product analysis result:", product_analysis)
         
        # Step 3: Prepare final response
        result = {
            "ingredients_count": len(ingredients),
            "processed_ingredients": ingredient_results,
            "ingredient_ids": product_analysis["ingredient_ids"],
            "overall_analysis": product_analysis,
            "user_id": current_user.id if current_user else None,
            "timestamp": datetime.now(tz=pytz.timezone('Asia/Kolkata')).isoformat()
        }
        
        log_info("process_ingredients_endpoint completed successfully")
        return result
        
    except Exception as e:
        log_error(f"Error in process_ingredients_endpoint: {str(e)}",e)
        raise HTTPException(status_code=500, detail="Internal Server Error")


@router.get("/get_by_marker_id/{target_id}", response_model=None)
async def get_analysis_by_marker_id(target_id: str, db: Session = Depends(get_db)):
    """
    Retrieves product analysis and ingredient information by marker ID.
    """
    log_info(f"Received request for analysis by marker ID: {target_id}")
    try:
        # Check if marker exists
        marker = db.query(Marker).filter(Marker.vuforia_id == target_id).first()
        if not marker:
            return JSONResponse(
                status_code=404,
                content={
                    "found": False,
                    "message": f"Marker with ID {target_id} not found",
                    "timestamp": datetime.now(tz=pytz.timezone('Asia/Kolkata')).isoformat()
                }
            )
            
        # Check if product exists
        product = db.query(Product).filter(Product.id == marker.product_id).first()
        if not product:
            return JSONResponse(
                status_code=404,
                content={
                    "found": False,
                    "message": f"Product not found for marker ID: {target_id}",
                    "timestamp": datetime.now(tz=pytz.timezone('Asia/Kolkata')).isoformat()
                }
            )
            
        # Try to get complete product data
        product_data = get_analysis_service_data(db, target_id)
        
        # If complete data retrieval fails, return minimal data
        if not product_data:
            return JSONResponse(
                status_code=200,
                content={
                    "found": True,
                    "basic_info": {
                        "product_id": str(product.id),
                        "product_name": getattr(product, 'product_name', 'Unknown Product'),
                    },
                    "message": "Product found but analysis data could not be processed",
                    "timestamp": datetime.now(tz=pytz.timezone('Asia/Kolkata')).isoformat()
                }
            )
            
        return product_data

    except Exception as e:
        log_error(f"Error in get_analysis_by_marker_id: {str(e)}", e)
        return JSONResponse(
            status_code=500,
            content={
                "found": False,
                "error": str(e),
                "message": "Error processing request",
                "timestamp": datetime.now(tz=pytz.timezone('Asia/Kolkata')).isoformat()
            }
        )