Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, HTTPException | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel | |
| from typing import List, Optional | |
| import torch | |
| from transformers import AutoTokenizer, AutoModelForCausalLM | |
| import json | |
| import uvicorn | |
| import os | |
| import pandas as pd | |
| import ast | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| import numpy as np | |
| import urllib.request | |
| import requests | |
| import asyncio | |
| import aiohttp | |
| from bs4 import BeautifulSoup | |
| import re | |
| from urllib.parse import urljoin, urlparse | |
| import time | |
| # Initialize FastAPI app | |
| app = FastAPI( | |
| title="🍳 Recipe AI Assistant API", | |
| description="AI-powered recipe recommendations using real recipe database", | |
| version="2.0.0" | |
| ) | |
| # Add CORS middleware for web and mobile access | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], # In production, specify your domains | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Global variables | |
| tokenizer = None | |
| model = None | |
| recipes_df = None | |
| interactions_df = None | |
| vectorizer = None | |
| recipe_vectors = None | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| # Request/Response Models | |
| class RecipeRequest(BaseModel): | |
| ingredients: str | |
| preferences: Optional[str] = "" | |
| max_minutes: int = 30 | |
| # Conversation intelligence fields | |
| user_id: Optional[str] = None | |
| session_id: Optional[str] = None | |
| conversation_context: Optional[dict] = None | |
| user_preferences: Optional[dict] = None | |
| # Personalization fields | |
| liked_recipe_ids: List[int] = [] | |
| disliked_recipe_ids: List[int] = [] | |
| dietary_restrictions: List[str] = [] | |
| preferred_cuisines: List[str] = [] | |
| class NutritionRequest(BaseModel): | |
| query: str | |
| user_id: Optional[str] = None | |
| previous_queries: List[str] = [] | |
| class ChatbotOptionRequest(BaseModel): | |
| user_input: str | |
| user_id: Optional[str] = None | |
| session_id: Optional[str] = None | |
| class UserFeedbackRequest(BaseModel): | |
| user_id: str | |
| recipe_id: int | |
| feedback_type: str # "like", "dislike", "save" | |
| interaction_context: Optional[dict] = None | |
| class DatabaseRecipe(BaseModel): | |
| id: int | |
| name: str | |
| description: str | |
| ingredients: List[str] | |
| steps: List[str] | |
| minutes: int | |
| servings: Optional[int] = None | |
| nutrition: Optional[dict] = None | |
| tags: List[str] = [] | |
| confidence: float | |
| class RecipeResponse(BaseModel): | |
| status: str | |
| recommendations: List[DatabaseRecipe] | |
| query: RecipeRequest | |
| error: Optional[str] = None | |
| class NutritionResponse(BaseModel): | |
| status: str | |
| topic: str | |
| summary: str | |
| key_points: List[str] | |
| trusted_sources: List[dict] | |
| error: Optional[str] = None | |
| class ChatbotOptionResponse(BaseModel): | |
| status: str | |
| response_type: str # "options", "nutrition", "recipe" | |
| message: str | |
| options: Optional[List[str]] = None | |
| nutrition_info: Optional[dict] = None | |
| recipes: Optional[List[DatabaseRecipe]] = None | |
| error: Optional[str] = None | |
| class UserFeedbackResponse(BaseModel): | |
| status: str | |
| message: str | |
| updated_preferences: Optional[dict] = None | |
| error: Optional[str] = None | |
| def safe_eval_list(x): | |
| """Safely parse string representations of lists""" | |
| if isinstance(x, list): | |
| return x | |
| if isinstance(x, str): | |
| try: | |
| # Try to evaluate as Python literal | |
| result = ast.literal_eval(x) | |
| if isinstance(result, list): | |
| return [str(item) for item in result] | |
| except (ValueError, SyntaxError): | |
| # Fall back to simple string splitting | |
| return [item.strip() for item in x.split(',') if item.strip()] | |
| return [] | |
| def filter_by_ratings(recipes_df, interactions_df, min_rating=4.0, min_reviews=5): | |
| """Filter recipes to only include those with good ratings""" | |
| try: | |
| print(f"📊 Processing {len(interactions_df)} interactions for rating filter...") | |
| # Calculate average rating and review count for each recipe | |
| recipe_stats = interactions_df.groupby('recipe_id').agg({ | |
| 'rating': ['mean', 'count'], | |
| 'review': lambda x: x.dropna().apply(lambda review: len(str(review)) > 10).sum() # Count meaningful reviews | |
| }).reset_index() | |
| # Flatten column names | |
| recipe_stats.columns = ['recipe_id', 'avg_rating', 'rating_count', 'meaningful_reviews'] | |
| # Filter for high-quality recipes | |
| high_quality = recipe_stats[ | |
| (recipe_stats['avg_rating'] >= min_rating) & | |
| (recipe_stats['rating_count'] >= min_reviews) | |
| ] | |
| print(f"🏆 Found {len(high_quality)} recipes with rating >= {min_rating} and >= {min_reviews} reviews") | |
| # Join with recipes and keep only high-quality ones | |
| filtered_recipes = recipes_df.merge( | |
| high_quality[['recipe_id', 'avg_rating', 'rating_count']], | |
| left_on='id', | |
| right_on='recipe_id', | |
| how='inner' | |
| ) | |
| # Add rating info to the dataframe | |
| filtered_recipes['avg_rating'] = filtered_recipes['avg_rating'].round(1) | |
| print(f"✅ Quality filter complete: {len(filtered_recipes)} highly-rated recipes") | |
| return filtered_recipes | |
| except Exception as e: | |
| print(f"⚠️ Rating filter failed: {e}") | |
| raise Exception(f"Failed to apply rating filter: {e}") | |
| def load_recipes(): | |
| """Load and process both RAW_recipes.csv and RAW_interactions.csv with rating filtering""" | |
| global recipes_df, interactions_df, vectorizer, recipe_vectors | |
| try: | |
| # Try to load from Hugging Face dataset directly | |
| print("📊 Attempting to load recipe dataset from Hugging Face...") | |
| try: | |
| # Method 1: Try with datasets library | |
| try: | |
| from datasets import load_dataset | |
| print("🔄 Loading from nutrientartcd/recipe-dataset...") | |
| dataset = load_dataset("nutrientartcd/recipe-dataset") | |
| # The dataset might not have splits, so try different approaches | |
| if hasattr(dataset, 'to_pandas'): | |
| df = dataset.to_pandas() | |
| elif 'train' in dataset: | |
| df = dataset['train'].to_pandas() | |
| else: | |
| # Get the first available split | |
| split_name = list(dataset.keys())[0] | |
| df = dataset[split_name].to_pandas() | |
| print(f"✅ Successfully loaded {len(df)} recipes from Hugging Face datasets!") | |
| except Exception as datasets_error: | |
| print(f"⚠️ Datasets library failed: {datasets_error}") | |
| # Method 2: Direct CSV download from Hugging Face | |
| print("🔄 Trying direct CSV download from Hugging Face...") | |
| import urllib.request | |
| csv_url = "https://huggingface.co/datasets/nutrientartcd/recipe-dataset/resolve/main/RAW_recipes.csv" | |
| local_csv = "/tmp/RAW_recipes_downloaded.csv" | |
| print(f"Downloading from: {csv_url}") | |
| urllib.request.urlretrieve(csv_url, local_csv) | |
| df = pd.read_csv(local_csv) | |
| print(f"✅ Successfully downloaded and loaded {len(df)} recipes from CSV!") | |
| # Also download interactions CSV for rating filtering | |
| interactions_url = "https://huggingface.co/datasets/nutrientartcd/recipe-dataset/resolve/main/RAW_interactions.csv" | |
| local_interactions = "/tmp/RAW_interactions_downloaded.csv" | |
| print("📊 Downloading interactions data for rating filtering...") | |
| urllib.request.urlretrieve(interactions_url, local_interactions) | |
| interactions_df = pd.read_csv(local_interactions) | |
| print(f"✅ Loaded {len(interactions_df)} interactions for rating filtering!") | |
| except Exception as hf_error: | |
| print(f"⚠️ Both Hugging Face methods failed: {hf_error}") | |
| # Try local paths as fallback | |
| print("🔄 Trying local CSV files...") | |
| possible_paths = [ | |
| "RAW_recipes.csv", | |
| "/tmp/RAW_recipes.csv", | |
| "./RAW_recipes.csv", | |
| "../RAW_recipes.csv", | |
| "/app/RAW_recipes.csv", | |
| "recipe_data/RAW_recipes.csv" | |
| ] | |
| dataset_path = None | |
| for path in possible_paths: | |
| if os.path.exists(path): | |
| dataset_path = path | |
| break | |
| if dataset_path is None: | |
| print("❌ No local CSV files found either") | |
| print("📂 Current working directory:", os.getcwd()) | |
| print("📋 Available files:", [f for f in os.listdir('.') if f.endswith('.csv')][:10]) | |
| raise FileNotFoundError("Neither Hugging Face dataset nor local CSV found") | |
| print(f"📊 Loading recipes from local file {dataset_path}...") | |
| df = pd.read_csv(dataset_path) | |
| # Clean and process the dataframe | |
| required_cols = ['id', 'name', 'minutes', 'ingredients', 'steps'] | |
| missing_cols = [col for col in required_cols if col not in df.columns] | |
| if missing_cols: | |
| raise ValueError(f"Missing required columns: {missing_cols}") | |
| # Filter recipes based on ratings from interactions | |
| if interactions_df is not None: | |
| df = filter_by_ratings(df, interactions_df) | |
| print(f"📈 After rating filter: {len(df)} high-quality recipes remaining") | |
| # Parse string lists | |
| df['ingredients'] = df['ingredients'].apply(safe_eval_list) | |
| df['steps'] = df['steps'].apply(safe_eval_list) | |
| df['tags'] = df.get('tags', '[]').apply(safe_eval_list) | |
| df['nutrition'] = df.get('nutrition', '[]').apply(safe_eval_list) | |
| # Clean data | |
| df = df[ | |
| (df['name'].str.len() > 1) & | |
| (df['minutes'] > 0) & | |
| (df['ingredients'].str.len() > 0) & | |
| (df['steps'].str.len() > 0) | |
| ].copy() | |
| # Create searchable text fields | |
| df['ingredients_text'] = df['ingredients'].apply(lambda x: ' '.join(x).lower()) | |
| df['steps_text'] = df['steps'].apply(lambda x: ' '.join(x).lower()) | |
| df['tags_text'] = df['tags'].apply(lambda x: ' '.join(x).lower()) | |
| df['search_text'] = ( | |
| df['name'].str.lower() + ' ' + | |
| df['ingredients_text'] + ' ' + | |
| df['tags_text'] + ' ' + | |
| df.get('description', '').fillna('').str.lower() | |
| ) | |
| # Create TF-IDF vectors for semantic search | |
| print("🔍 Building search index...") | |
| vectorizer = TfidfVectorizer( | |
| max_features=5000, | |
| stop_words='english', | |
| ngram_range=(1, 2), | |
| min_df=2 | |
| ) | |
| recipe_vectors = vectorizer.fit_transform(df['search_text']) | |
| recipes_df = df | |
| print(f"✅ Loaded {len(df)} recipes successfully!") | |
| except Exception as e: | |
| print(f"❌ Error loading recipes: {e}") | |
| print(f"📍 Error details: {type(e).__name__}: {str(e)}") | |
| raise Exception(f"Failed to load recipe database: {e}") | |
| async def get_usda_food_suggestions(query_text, limit=5): | |
| """Use USDA FoodData Central API to intelligently understand food terms""" | |
| try: | |
| # Clean the query to extract potential food terms | |
| food_words = [word for word in query_text.lower().split() | |
| if word not in ['i', 'want', 'recipe', 'recipes', 'for', 'the', 'a', 'an']] | |
| if not food_words: | |
| return [] | |
| # Search USDA database for food items | |
| search_term = ' '.join(food_words[:2]) # Use first 2 meaningful words | |
| url = "https://api.nal.usda.gov/fdc/v1/foods/search" | |
| params = { | |
| 'query': search_term, | |
| 'dataType': ['Foundation', 'SR Legacy'], # Most comprehensive data | |
| 'pageSize': limit, | |
| 'api_key': 'DEMO_KEY' # Free demo key, works for testing | |
| } | |
| async with aiohttp.ClientSession() as session: | |
| async with session.get(url, params=params) as response: | |
| if response.status == 200: | |
| data = await response.json() | |
| food_suggestions = [] | |
| for food in data.get('foods', []): | |
| description = food.get('description', '').lower() | |
| # Extract meaningful food terms from USDA descriptions | |
| if description: | |
| food_suggestions.append(description) | |
| print(f"🥗 USDA found: {food_suggestions[:3]}") | |
| return food_suggestions[:3] # Return top 3 matches | |
| else: | |
| print(f"⚠️ USDA API error: {response.status}") | |
| return [] | |
| except Exception as e: | |
| print(f"⚠️ USDA API failed: {e}") | |
| return [] | |
| async def extract_query_features_with_llm(query_text, preferences="", max_minutes=30): | |
| """Use USDA API + DialoGPT for truly intelligent food understanding""" | |
| global tokenizer, model | |
| full_query = f"{query_text} {preferences}".strip() | |
| # Start with the original query | |
| base_search_terms = [full_query] | |
| # Get intelligent food suggestions from USDA | |
| usda_suggestions = await get_usda_food_suggestions(query_text) | |
| # If DialoGPT is available, use it for context enhancement | |
| llm_enhanced_terms = [] | |
| if model is not None and tokenizer is not None: | |
| try: | |
| conversation = f"User: I want to cook {query_text}".strip() | |
| inputs = tokenizer.encode(conversation + tokenizer.eos_token, return_tensors="pt").to(device) | |
| outputs = model.generate( | |
| inputs, | |
| max_new_tokens=20, | |
| temperature=0.7, | |
| top_p=0.9, | |
| do_sample=True, | |
| pad_token_id=tokenizer.pad_token_id, | |
| repetition_penalty=1.2 | |
| ) | |
| response = tokenizer.decode(outputs[0][inputs.shape[1]:], skip_special_tokens=True) | |
| # Only extract actual food/cooking terms | |
| for word in response.split(): | |
| word_clean = word.lower().strip('.,!?') | |
| if len(word_clean) > 3 and word_clean not in ['that', 'have', 'with', 'this', 'your', 'they', 'them']: | |
| llm_enhanced_terms.append(word_clean) | |
| llm_enhanced_terms = llm_enhanced_terms[:2] # Limit to 2 terms | |
| except Exception as e: | |
| print(f"⚠️ DialoGPT failed: {e}") | |
| # Combine all intelligent suggestions | |
| all_search_terms = base_search_terms + usda_suggestions + llm_enhanced_terms | |
| print(f"🧠 Smart search terms: {all_search_terms[:5]}") | |
| return { | |
| 'original_query': full_query, | |
| 'search_terms': all_search_terms, | |
| 'max_minutes': max_minutes, | |
| 'usda_enhanced': len(usda_suggestions) > 0, | |
| 'llm_enhanced': len(llm_enhanced_terms) > 0 | |
| } | |
| def parse_llm_json_response(response_text): | |
| """Parse LLM's JSON response into structured features""" | |
| try: | |
| # Clean the response - remove any non-JSON text | |
| response_text = response_text.strip() | |
| # Find JSON content between braces | |
| start_idx = response_text.find('{') | |
| end_idx = response_text.rfind('}') + 1 | |
| if start_idx == -1 or end_idx == 0: | |
| raise ValueError("No JSON found in response") | |
| json_text = response_text[start_idx:end_idx] | |
| # Parse JSON | |
| features = json.loads(json_text) | |
| # Ensure all expected keys exist with default empty lists | |
| default_features = { | |
| 'ingredients': [], | |
| 'meal_types': [], | |
| 'cuisines': [], | |
| 'dietary_restrictions': [], | |
| 'cooking_styles': [], | |
| 'cooking_methods': [], | |
| 'flavors': [] | |
| } | |
| # Merge with defaults | |
| for key in default_features: | |
| if key not in features: | |
| features[key] = [] | |
| elif not isinstance(features[key], list): | |
| features[key] = [str(features[key])] | |
| return features | |
| except Exception as e: | |
| print(f"⚠️ JSON parsing failed: {e}") | |
| print(f"Response text: {response_text[:200]}...") | |
| # Fallback: extract key terms manually | |
| text_lower = response_text.lower() | |
| return { | |
| 'ingredients': extract_terms_from_text(text_lower, ['chocolate', 'vanilla', 'sugar', 'flour', 'butter', 'eggs', 'milk']), | |
| 'meal_types': extract_terms_from_text(text_lower, ['dessert', 'breakfast', 'lunch', 'dinner', 'snack']), | |
| 'cuisines': extract_terms_from_text(text_lower, ['italian', 'mexican', 'asian', 'french']), | |
| 'dietary_restrictions': extract_terms_from_text(text_lower, ['vegetarian', 'vegan', 'gluten-free']), | |
| 'cooking_styles': extract_terms_from_text(text_lower, ['quick', 'easy', 'healthy']), | |
| 'cooking_methods': extract_terms_from_text(text_lower, ['baked', 'fried', 'grilled']), | |
| 'flavors': extract_terms_from_text(text_lower, ['sweet', 'savory', 'spicy']) | |
| } | |
| def extract_terms_from_text(text, terms_list): | |
| """Helper function to extract terms from text""" | |
| return [term for term in terms_list if term in text] | |
| def apply_personalization_filters(df, request_data): | |
| """Apply personalization filters based on user preferences and history""" | |
| filtered_df = df.copy() | |
| # Filter out disliked recipes | |
| if request_data.disliked_recipe_ids: | |
| filtered_df = filtered_df[~filtered_df['id'].isin(request_data.disliked_recipe_ids)] | |
| print(f"🚫 Filtered out {len(request_data.disliked_recipe_ids)} disliked recipes") | |
| # Apply dietary restrictions | |
| if request_data.dietary_restrictions: | |
| for restriction in request_data.dietary_restrictions: | |
| if restriction.lower() == "vegetarian": | |
| # Filter out meat-based recipes | |
| meat_keywords = ['beef', 'chicken', 'pork', 'lamb', 'fish', 'salmon', 'tuna'] | |
| for keyword in meat_keywords: | |
| filtered_df = filtered_df[~filtered_df['ingredients_text'].str.contains(keyword, case=False, na=False)] | |
| elif restriction.lower() == "vegan": | |
| # Filter out animal products | |
| animal_keywords = ['beef', 'chicken', 'pork', 'lamb', 'fish', 'milk', 'cheese', 'butter', 'egg', 'cream'] | |
| for keyword in animal_keywords: | |
| filtered_df = filtered_df[~filtered_df['ingredients_text'].str.contains(keyword, case=False, na=False)] | |
| elif restriction.lower() == "gluten-free": | |
| # Filter out gluten-containing ingredients | |
| gluten_keywords = ['flour', 'wheat', 'bread', 'pasta', 'noodles'] | |
| for keyword in gluten_keywords: | |
| filtered_df = filtered_df[~filtered_df['ingredients_text'].str.contains(keyword, case=False, na=False)] | |
| return filtered_df | |
| def apply_personalization_ranking(df, request_data): | |
| """Apply personalization ranking boosts based on user preferences""" | |
| if df.empty or not request_data: | |
| return df | |
| # Boost recipes from preferred cuisines | |
| if request_data.preferred_cuisines: | |
| for cuisine in request_data.preferred_cuisines: | |
| cuisine_mask = ( | |
| df['name'].str.lower().str.contains(cuisine.lower(), na=False) | | |
| df['tags_text'].str.contains(cuisine.lower(), na=False) | | |
| df['search_text'].str.contains(cuisine.lower(), na=False) | |
| ) | |
| df.loc[cuisine_mask, 'similarity'] *= 1.5 | |
| # Boost recipes similar to liked ones (simplified - in production use embedding similarity) | |
| if request_data.liked_recipe_ids: | |
| # This is a simplified approach - in production you'd use recipe embeddings | |
| boost_factor = 1.3 | |
| print(f"🎯 Applied personalization boosts for {len(request_data.liked_recipe_ids)} liked recipes") | |
| return df | |
| def search_recipes(query_features, request_data=None, top_k=10): | |
| """Enhanced intelligent search with personalization and conversation context""" | |
| global recipes_df, vectorizer, recipe_vectors | |
| if recipes_df is None: | |
| load_recipes() | |
| # Filter by time constraint | |
| filtered_df = recipes_df[recipes_df['minutes'] <= query_features['max_minutes']].copy() | |
| if len(filtered_df) == 0: | |
| filtered_df = recipes_df.copy() # Fall back to all recipes | |
| # Apply personalization filters if available | |
| if request_data: | |
| filtered_df = apply_personalization_filters(filtered_df, request_data) | |
| # Create search query from all terms (original query + DialoGPT enhancements) | |
| search_query = ' '.join(query_features['search_terms']) | |
| if search_query and vectorizer is not None: | |
| # Semantic search using TF-IDF on the full query | |
| query_vector = vectorizer.transform([search_query]) | |
| # Get vectors for the filtered subset by re-indexing | |
| filtered_indices = filtered_df.index.tolist() | |
| try: | |
| # Make sure indices are within bounds | |
| valid_indices = [i for i in filtered_indices if i < recipe_vectors.shape[0]] | |
| if valid_indices: | |
| filtered_vectors = recipe_vectors[valid_indices] | |
| similarities = cosine_similarity(query_vector, filtered_vectors).flatten() | |
| # Update filtered_df to only include valid indices | |
| filtered_df = filtered_df.loc[valid_indices] | |
| else: | |
| # No valid indices, fall back to random selection | |
| similarities = np.array([0.5] * len(filtered_df)) | |
| except Exception as e: | |
| print(f"⚠️ Vector indexing error: {e}, falling back to random") | |
| similarities = np.array([0.5] * len(filtered_df)) | |
| # Add similarity scores (ensure lengths match) | |
| filtered_df = filtered_df.copy() | |
| if len(similarities) == len(filtered_df): | |
| filtered_df['similarity'] = similarities | |
| else: | |
| print(f"⚠️ Similarity length mismatch: {len(similarities)} vs {len(filtered_df)}") | |
| filtered_df['similarity'] = 0.5 | |
| # Simple boosting based on query content detection | |
| original_query = query_features.get('original_query', '').lower() | |
| # Boost for dessert-related queries | |
| if any(word in original_query for word in ['dessert', 'sweet', 'chocolate', 'cake', 'cookie']): | |
| dessert_patterns = ['chocolate', 'cake', 'cookie', 'dessert', 'sweet', 'brownie', 'pie'] | |
| for pattern in dessert_patterns: | |
| mask = (filtered_df['name'].str.lower().str.contains(pattern, na=False) | | |
| filtered_df['search_text'].str.contains(pattern, na=False)) | |
| filtered_df.loc[mask, 'similarity'] *= 2.0 | |
| # Boost for specific food mentions (burger, pasta, etc.) | |
| food_words = [word for word in original_query.split() if len(word) > 3] | |
| for word in food_words: | |
| if word not in ['want', 'like', 'something', 'recipes', 'recipe']: | |
| mask = (filtered_df['name'].str.lower().str.contains(word, na=False) | | |
| filtered_df['ingredients_text'].str.contains(word, na=False) | | |
| filtered_df['search_text'].str.contains(word, na=False)) | |
| filtered_df.loc[mask, 'similarity'] *= 1.5 | |
| # Apply personalization ranking if request data available | |
| if request_data: | |
| filtered_df = apply_personalization_ranking(filtered_df, request_data) | |
| # Sort by similarity (descending) | |
| filtered_df = filtered_df.sort_values('similarity', ascending=False) | |
| # Log the top results for debugging | |
| print(f"🔍 Search results for '{search_query}':") | |
| for i, (_, recipe) in enumerate(filtered_df.head(3).iterrows()): | |
| print(f" {i+1}. {recipe['name']} (sim: {recipe['similarity']:.3f})") | |
| else: | |
| # Fallback: random selection | |
| filtered_df = filtered_df.sample(min(len(filtered_df), top_k*2), random_state=42) | |
| filtered_df['similarity'] = 0.5 | |
| return filtered_df.head(top_k) | |
| # New enhanced chatbot endpoint - option selection | |
| async def chatbot_options(request: ChatbotOptionRequest): | |
| """ | |
| Enhanced chatbot that gives users option between nutrition recommendations and recipes | |
| """ | |
| try: | |
| user_input = request.user_input.lower().strip() | |
| # Check if user is asking for specific type of help | |
| if any(word in user_input for word in ["nutrition", "healthy", "vitamin", "mineral", "diet", "health"]): | |
| return ChatbotOptionResponse( | |
| status="success", | |
| response_type="nutrition", | |
| message="I can help you with nutrition information! What specific topic would you like to learn about?", | |
| options=["Vitamins & Minerals", "Heart Health", "Weight Management", "Diabetes Nutrition", "General Nutrition Tips"] | |
| ) | |
| elif any(word in user_input for word in ["recipe", "cook", "meal", "food", "ingredients"]): | |
| return ChatbotOptionResponse( | |
| status="success", | |
| response_type="recipe", | |
| message="I can help you find recipes! Tell me what ingredients you have or what type of meal you'd like.", | |
| options=["Quick Meals (15-30 min)", "Healthy Options", "Comfort Food", "Vegetarian", "Use My Ingredients"] | |
| ) | |
| else: | |
| # Initial greeting - present both options | |
| return ChatbotOptionResponse( | |
| status="success", | |
| response_type="options", | |
| message="Hello! I'm your nutrition and recipe assistant. How can I help you today?", | |
| options=["🍎 Get nutrition recommendations", "🍳 Find recipe recommendations"] | |
| ) | |
| except Exception as e: | |
| return ChatbotOptionResponse( | |
| status="error", | |
| response_type="options", | |
| message="Sorry, I encountered an error. Please try again.", | |
| error=str(e) | |
| ) | |
| # Nutrition information endpoint | |
| async def get_nutrition_info(request: NutritionRequest): | |
| """ | |
| Provides nutritional recommendations with trustworthy sources | |
| """ | |
| try: | |
| query = request.query.lower().strip() | |
| # Generate nutrition response using intelligent web scraping | |
| nutrition_info = await generate_intelligent_nutrition_response(query) | |
| return NutritionResponse( | |
| status="success", | |
| topic=nutrition_info["topic"], | |
| summary=nutrition_info["summary"], | |
| key_points=nutrition_info["key_points"], | |
| trusted_sources=nutrition_info["sources"] | |
| ) | |
| except Exception as e: | |
| return NutritionResponse( | |
| status="error", | |
| topic="Error", | |
| summary="Failed to retrieve nutrition information", | |
| key_points=[], | |
| trusted_sources=[], | |
| error=str(e) | |
| ) | |
| # User feedback endpoint for reinforcement learning | |
| async def record_user_feedback(request: UserFeedbackRequest): | |
| """ | |
| Records user feedback for reinforcement learning improvements | |
| """ | |
| try: | |
| # In a real implementation, this would store feedback in a database | |
| # For now, we'll log it and return success | |
| print(f"📊 User feedback: User {request.user_id} {request.feedback_type} recipe {request.recipe_id}") | |
| # Here you would typically: | |
| # 1. Store the feedback in a database | |
| # 2. Update user preference models | |
| # 3. Trigger retraining of recommendation models | |
| return UserFeedbackResponse( | |
| status="success", | |
| message=f"Thank you for your feedback! Your {request.feedback_type} has been recorded.", | |
| updated_preferences={"learning": True} | |
| ) | |
| except Exception as e: | |
| return UserFeedbackResponse( | |
| status="error", | |
| message="Failed to record feedback", | |
| error=str(e) | |
| ) | |
| # Web scraping and content extraction | |
| class WebScraper: | |
| def __init__(self): | |
| self.headers = { | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' | |
| } | |
| self.cache = {} # Simple in-memory cache | |
| self.cache_duration = 3600 # 1 hour cache | |
| async def scrape_url(self, url: str) -> dict: | |
| """Scrape content from a single URL""" | |
| try: | |
| # Check cache first | |
| cache_key = url | |
| if cache_key in self.cache: | |
| cached_data, timestamp = self.cache[cache_key] | |
| if time.time() - timestamp < self.cache_duration: | |
| return cached_data | |
| async with aiohttp.ClientSession() as session: | |
| async with session.get(url, headers=self.headers, timeout=10) as response: | |
| if response.status == 200: | |
| html = await response.text() | |
| soup = BeautifulSoup(html, 'html.parser') | |
| # Extract meaningful content | |
| content = self.extract_content(soup, url) | |
| # Cache the result | |
| self.cache[cache_key] = (content, time.time()) | |
| return content | |
| else: | |
| return {"error": f"HTTP {response.status}"} | |
| except Exception as e: | |
| return {"error": str(e)} | |
| def extract_content(self, soup: BeautifulSoup, url: str) -> dict: | |
| """Extract meaningful content from BeautifulSoup object""" | |
| # Remove script and style elements | |
| for script in soup(["script", "style", "nav", "footer", "header"]): | |
| script.decompose() | |
| # Try to find the main content area | |
| main_content = soup.find('main') or soup.find('article') or soup.find('div', class_=re.compile(r'content|main|article')) | |
| if not main_content: | |
| main_content = soup.find('body') | |
| # Extract title | |
| title = "" | |
| title_tag = soup.find('title') | |
| if title_tag: | |
| title = title_tag.get_text().strip() | |
| # Extract headings and paragraphs | |
| headings = [] | |
| paragraphs = [] | |
| if main_content: | |
| # Get headings (h1, h2, h3) | |
| for heading in main_content.find_all(['h1', 'h2', 'h3']): | |
| heading_text = heading.get_text().strip() | |
| if heading_text and len(heading_text) < 200: | |
| headings.append(heading_text) | |
| # Get paragraphs | |
| for p in main_content.find_all('p'): | |
| p_text = p.get_text().strip() | |
| if p_text and len(p_text) > 50: # Filter out short paragraphs | |
| paragraphs.append(p_text) | |
| # Extract lists (ul, ol) | |
| lists = [] | |
| if main_content: | |
| for ul in main_content.find_all(['ul', 'ol']): | |
| list_items = [] | |
| for li in ul.find_all('li'): | |
| li_text = li.get_text().strip() | |
| if li_text and len(li_text) < 300: | |
| list_items.append(li_text) | |
| if list_items: | |
| lists.append(list_items) | |
| return { | |
| "title": title, | |
| "url": url, | |
| "domain": urlparse(url).netloc, | |
| "headings": headings[:10], # Limit to first 10 headings | |
| "paragraphs": paragraphs[:15], # Limit to first 15 paragraphs | |
| "lists": lists[:5], # Limit to first 5 lists | |
| "scraped_at": time.time() | |
| } | |
| async def scrape_multiple_urls(self, urls: list) -> list: | |
| """Scrape multiple URLs concurrently""" | |
| tasks = [self.scrape_url(url) for url in urls] | |
| results = await asyncio.gather(*tasks, return_exceptions=True) | |
| # Filter out exceptions and errors | |
| valid_results = [] | |
| for result in results: | |
| if isinstance(result, dict) and "error" not in result: | |
| valid_results.append(result) | |
| return valid_results | |
| # Initialize scraper | |
| web_scraper = WebScraper() | |
| def get_trusted_urls_for_query(query: str) -> list: | |
| """Get relevant trusted URLs based on the query""" | |
| query_lower = query.lower() | |
| urls = [] | |
| # Weight loss / management | |
| if any(phrase in query_lower for phrase in ["lose weight", "weight loss", "weight management"]): | |
| urls.extend([ | |
| "https://www.cdc.gov/healthyweight/losing_weight/index.html", | |
| "https://www.niddk.nih.gov/health-information/weight-management/choosing-a-safe-successful-weight-loss-program", | |
| "https://www.mayoclinic.org/healthy-lifestyle/weight-loss/basics/weightloss-basics/hlv-20049483" | |
| ]) | |
| # Heart health | |
| elif any(phrase in query_lower for phrase in ["heart", "cardiovascular", "cholesterol"]): | |
| urls.extend([ | |
| "https://www.heart.org/en/healthy-living/healthy-eating/eat-smart/nutrition-basics", | |
| "https://www.nhlbi.nih.gov/education/dash-eating-plan", | |
| "https://www.mayoclinic.org/diseases-conditions/heart-disease/in-depth/heart-healthy-diet/art-20047702" | |
| ]) | |
| # Diabetes | |
| elif any(phrase in query_lower for phrase in ["diabetes", "blood sugar"]): | |
| urls.extend([ | |
| "https://www.cdc.gov/diabetes/managing/eat-well.html", | |
| "https://www.niddk.nih.gov/health-information/diabetes/overview/diet-eating-physical-activity", | |
| "https://diabetes.org/food-nutrition" | |
| ]) | |
| # Vitamins and supplements | |
| elif any(word in query_lower for word in ["vitamin", "supplement", "mineral"]): | |
| urls.extend([ | |
| "https://ods.od.nih.gov/factsheets/list-all/", | |
| "https://www.nutrition.gov/topics/dietary-supplements", | |
| "https://www.mayoclinic.org/healthy-lifestyle/nutrition-and-healthy-eating/in-depth/supplements/art-20044894" | |
| ]) | |
| # General nutrition | |
| else: | |
| urls.extend([ | |
| "https://www.nutrition.gov/topics/basic-nutrition", | |
| "https://www.cdc.gov/nutrition/guidelines.html", | |
| "https://www.choosemyplate.gov/" | |
| ]) | |
| return urls[:3] # Limit to 3 URLs to avoid overwhelming the system | |
| async def generate_intelligent_nutrition_response(query: str) -> dict: | |
| """Generate nutrition response by scraping and summarizing trusted sources""" | |
| # Get relevant URLs | |
| trusted_urls = get_trusted_urls_for_query(query) | |
| # Scrape the URLs | |
| scraped_data = await web_scraper.scrape_multiple_urls(trusted_urls) | |
| if not scraped_data: | |
| # Fallback to static response if scraping fails | |
| return generate_static_nutrition_response(query) | |
| # Combine and summarize the scraped content | |
| combined_content = "" | |
| sources = [] | |
| for data in scraped_data: | |
| # Add to sources | |
| sources.append({ | |
| "title": data["title"], | |
| "url": data["url"], | |
| "domain": data["domain"], | |
| "credibility_score": get_credibility_score(data["domain"]) | |
| }) | |
| # Combine content for summarization | |
| content_parts = [] | |
| content_parts.extend(data["headings"]) | |
| content_parts.extend(data["paragraphs"][:5]) # First 5 paragraphs | |
| # Add list items | |
| for list_items in data["lists"]: | |
| content_parts.extend(list_items[:3]) # First 3 items from each list | |
| combined_content += " ".join(content_parts) + " " | |
| # Generate summary using the scraped content | |
| summary, key_points = summarize_nutrition_content(combined_content, query) | |
| # Determine topic from query | |
| topic = determine_nutrition_topic(query) | |
| return { | |
| "topic": topic, | |
| "summary": summary, | |
| "key_points": key_points, | |
| "sources": sources, | |
| "scraped_from": len(scraped_data), | |
| "query_analyzed": query | |
| } | |
| def get_credibility_score(domain: str) -> float: | |
| """Get credibility score for a domain""" | |
| scores = { | |
| "cdc.gov": 0.95, | |
| "nih.gov": 0.98, | |
| "niddk.nih.gov": 0.98, | |
| "nutrition.gov": 0.95, | |
| "mayoclinic.org": 0.90, | |
| "heart.org": 0.92, | |
| "diabetes.org": 0.93, | |
| "choosemyplate.gov": 0.90, | |
| "nhlbi.nih.gov": 0.95, | |
| "ods.od.nih.gov": 0.98 | |
| } | |
| return scores.get(domain, 0.75) | |
| def summarize_nutrition_content(content: str, query: str) -> tuple: | |
| """Summarize nutrition content and extract key points""" | |
| # Clean the content | |
| content = re.sub(r'\s+', ' ', content) # Remove extra whitespace | |
| content = content[:3000] # Limit content length | |
| # Use simple summarization for now (could use LLM later) | |
| sentences = content.split('.') | |
| # Find most relevant sentences based on query keywords | |
| query_words = query.lower().split() | |
| relevant_sentences = [] | |
| for sentence in sentences: | |
| sentence = sentence.strip() | |
| if len(sentence) > 20: | |
| # Score sentence based on query word matches | |
| score = sum(1 for word in query_words if word in sentence.lower()) | |
| if score > 0: | |
| relevant_sentences.append((score, sentence)) | |
| # Sort by relevance and take top sentences | |
| relevant_sentences.sort(key=lambda x: x[0], reverse=True) | |
| # Create summary from top 3 relevant sentences | |
| summary_sentences = [sent[1] for sent in relevant_sentences[:3]] | |
| summary = ". ".join(summary_sentences) | |
| if not summary: | |
| summary = "Evidence-based nutrition information from trusted health organizations." | |
| # Extract key points (look for list-like content) | |
| key_points = [] | |
| for sentence in sentences: | |
| sentence = sentence.strip() | |
| if any(starter in sentence.lower() for starter in ["eat ", "choose ", "limit ", "include ", "avoid ", "consume "]): | |
| if len(sentence) > 20 and len(sentence) < 150: | |
| key_points.append(sentence.capitalize()) | |
| # Ensure we have at least 4 key points | |
| if len(key_points) < 4: | |
| key_points.extend([ | |
| "Eat a variety of nutrient-dense foods from all food groups", | |
| "Practice portion control and mindful eating", | |
| "Stay hydrated with water as your primary beverage", | |
| "Consult healthcare professionals for personalized advice" | |
| ]) | |
| return summary[:500], key_points[:6] # Limit summary and key points | |
| def determine_nutrition_topic(query: str) -> str: | |
| """Determine the main nutrition topic from the query""" | |
| query_lower = query.lower() | |
| if any(phrase in query_lower for phrase in ["lose weight", "weight loss"]): | |
| return "Weight Loss Nutrition" | |
| elif any(phrase in query_lower for phrase in ["gain weight", "build muscle"]): | |
| return "Healthy Weight Gain" | |
| elif any(phrase in query_lower for phrase in ["heart", "cardiovascular"]): | |
| return "Heart-Healthy Nutrition" | |
| elif any(phrase in query_lower for phrase in ["diabetes", "blood sugar"]): | |
| return "Diabetes Nutrition Management" | |
| elif any(word in query_lower for word in ["vitamin", "supplement"]): | |
| return "Vitamins and Supplements" | |
| else: | |
| return "General Nutrition Guidelines" | |
| def generate_static_nutrition_response(query: str) -> dict: | |
| """Fallback static response when scraping fails""" | |
| # Your existing static response logic here | |
| return { | |
| "topic": "General Nutrition", | |
| "summary": "Unable to fetch current information. Please try again later.", | |
| "key_points": ["Consult healthcare professionals for nutrition advice"], | |
| "sources": [] | |
| } | |
| def generate_nutrition_response(query: str) -> dict: | |
| """ | |
| Legacy sync wrapper for async function | |
| """ | |
| import asyncio | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| try: | |
| return loop.run_until_complete(generate_intelligent_nutrition_response(query)) | |
| finally: | |
| loop.close() | |
| # Load model on startup | |
| async def load_model(): | |
| global tokenizer, model | |
| try: | |
| print("🚀 Loading DialoGPT for Recipe Intelligence...") | |
| # Use DialoGPT-small - lightweight and great for conversational understanding | |
| model_name = "microsoft/DialoGPT-small" | |
| # Load tokenizer | |
| print("📚 Loading DialoGPT tokenizer...") | |
| tokenizer = AutoTokenizer.from_pretrained(model_name) | |
| if tokenizer.pad_token is None: | |
| tokenizer.pad_token = tokenizer.eos_token | |
| # Load model - much lighter than Llama 2 | |
| print("🤖 Loading DialoGPT model (optimized for HF Spaces)...") | |
| model = AutoModelForCausalLM.from_pretrained( | |
| model_name, | |
| torch_dtype=torch.float16 if device == "cuda" else torch.float32, | |
| low_cpu_mem_usage=True | |
| ).to(device) | |
| model.eval() | |
| print(f"✅ DialoGPT model loaded successfully on {device}!") | |
| # Load recipe database | |
| load_recipes() | |
| except Exception as e: | |
| print(f"❌ Error loading DialoGPT model: {e}") | |
| print("Falling back to enhanced rule-based processing...") | |
| # Don't fail completely - we can still work with enhanced rule-based extraction | |
| tokenizer = None | |
| model = None | |
| load_recipes() | |
| # Health check endpoint | |
| async def root(): | |
| if recipes_df is None: | |
| load_recipes() | |
| return { | |
| "message": "🍳 Recipe AI Assistant API v2.0", | |
| "status": "healthy", | |
| "model_loaded": model is not None, | |
| "recipes_loaded": recipes_df is not None, | |
| "recipe_count": len(recipes_df) if recipes_df is not None else 0, | |
| "device": device, | |
| "current_directory": os.getcwd(), | |
| "available_files": [f for f in os.listdir('.') if f.endswith('.csv')][:5] | |
| } | |
| # Debug endpoint to check recipe database content | |
| async def debug_search_recipes(query: str): | |
| """Debug endpoint to check if specific terms exist in recipe database""" | |
| if recipes_df is None: | |
| load_recipes() | |
| query_lower = query.lower() | |
| # Search in recipe names | |
| name_matches = recipes_df[recipes_df['name'].str.lower().str.contains(query_lower, na=False)] | |
| # Search in ingredients | |
| ingredient_matches = recipes_df[recipes_df['ingredients_text'].str.contains(query_lower, na=False)] | |
| # Search in all searchable text | |
| full_text_matches = recipes_df[recipes_df['search_text'].str.contains(query_lower, na=False)] | |
| return { | |
| "query": query, | |
| "total_recipes": len(recipes_df), | |
| "name_matches": { | |
| "count": len(name_matches), | |
| "examples": name_matches['name'].head(5).tolist() if len(name_matches) > 0 else [] | |
| }, | |
| "ingredient_matches": { | |
| "count": len(ingredient_matches), | |
| "examples": ingredient_matches['name'].head(5).tolist() if len(ingredient_matches) > 0 else [] | |
| }, | |
| "full_text_matches": { | |
| "count": len(full_text_matches), | |
| "examples": full_text_matches['name'].head(5).tolist() if len(full_text_matches) > 0 else [] | |
| } | |
| } | |
| # Health check endpoint | |
| async def health_check(): | |
| return { | |
| "status": "healthy", | |
| "model_status": "loaded" if model is not None else "not_loaded", | |
| "recipes_status": "loaded" if recipes_df is not None else "not_loaded", | |
| "recipe_count": len(recipes_df) if recipes_df is not None else 0, | |
| "device": device | |
| } | |
| # Main recipe recommendation endpoint | |
| async def get_recipe_suggestions(request: RecipeRequest): | |
| try: | |
| if recipes_df is None: | |
| load_recipes() | |
| print(f"📥 Recipe request: {request.ingredients}, prefs: {request.preferences}, time: {request.max_minutes}") | |
| # Use USDA API + LLM for intelligent feature extraction | |
| query_features = await extract_query_features_with_llm( | |
| request.ingredients, | |
| request.preferences, | |
| request.max_minutes | |
| ) | |
| # Search for matching recipes with personalization | |
| matching_recipes = search_recipes(query_features, request_data=request, top_k=5) | |
| # Convert to response format | |
| recommendations = [] | |
| for _, recipe in matching_recipes.iterrows(): | |
| # Parse nutrition if available | |
| nutrition = None | |
| if isinstance(recipe.get('nutrition'), list) and len(recipe['nutrition']) > 0: | |
| try: | |
| if isinstance(recipe['nutrition'][0], str): | |
| nutrition_list = ast.literal_eval(recipe['nutrition'][0]) | |
| else: | |
| nutrition_list = recipe['nutrition'] | |
| if len(nutrition_list) >= 7: # Ensure we have enough nutrition values | |
| nutrition = { | |
| "calories": float(nutrition_list[0]) if nutrition_list[0] else 0, | |
| "fat": float(nutrition_list[1]) if nutrition_list[1] else 0, | |
| "sugar": float(nutrition_list[2]) if nutrition_list[2] else 0, | |
| "sodium": float(nutrition_list[3]) if nutrition_list[3] else 0, | |
| "protein": float(nutrition_list[4]) if nutrition_list[4] else 0, | |
| "saturated_fat": float(nutrition_list[5]) if nutrition_list[5] else 0, | |
| "carbs": float(nutrition_list[6]) if nutrition_list[6] else 0 | |
| } | |
| except: | |
| nutrition = None | |
| # Clean the data to handle NaN values | |
| clean_description = recipe.get('description', '') | |
| if pd.isna(clean_description) or clean_description is None: | |
| clean_description = '' | |
| clean_name = recipe.get('name', 'Untitled Recipe') | |
| if pd.isna(clean_name): | |
| clean_name = 'Untitled Recipe' | |
| # Ensure minutes is valid | |
| recipe_minutes = recipe.get('minutes', 30) | |
| if pd.isna(recipe_minutes) or recipe_minutes <= 0: | |
| recipe_minutes = 30 | |
| # Use avg_rating as confidence (normalized to 0-1 scale for 5-star display) | |
| # If avg_rating exists, use it; otherwise fallback to similarity or 0.8 for high-quality recipes | |
| recipe_confidence = float(recipe.get('avg_rating', 4.5)) / 5.0 # Convert 4-5 star rating to 0.8-1.0 scale | |
| db_recipe = DatabaseRecipe( | |
| id=int(recipe['id']), | |
| name=str(clean_name), | |
| description=str(clean_description), | |
| ingredients=recipe['ingredients'], | |
| steps=recipe['steps'], | |
| minutes=int(recipe_minutes), | |
| servings=recipe.get('servings', recipe.get('n_ingredients', 4)), | |
| nutrition=nutrition, | |
| tags=recipe['tags'], | |
| confidence=recipe_confidence | |
| ) | |
| recommendations.append(db_recipe) | |
| return RecipeResponse( | |
| status="success", | |
| recommendations=recommendations, | |
| query=request | |
| ) | |
| except Exception as e: | |
| print(f"❌ Error generating recommendations: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| if __name__ == "__main__": | |
| port = int(os.environ.get("PORT", 7860)) | |
| uvicorn.run( | |
| "app:app", | |
| host="0.0.0.0", | |
| port=port, | |
| reload=False | |
| ) |