from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from typing import List, Optional import torch from transformers import AutoTokenizer, AutoModelForCausalLM import json import uvicorn import os import pandas as pd import ast from sklearn.metrics.pairwise import cosine_similarity from sklearn.feature_extraction.text import TfidfVectorizer import numpy as np import urllib.request import requests import asyncio import aiohttp from bs4 import BeautifulSoup import re from urllib.parse import urljoin, urlparse import time # Initialize FastAPI app app = FastAPI( title="🍳 Recipe AI Assistant API", description="AI-powered recipe recommendations using real recipe database", version="2.0.0" ) # Add CORS middleware for web and mobile access app.add_middleware( CORSMiddleware, allow_origins=["*"], # In production, specify your domains allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Global variables tokenizer = None model = None recipes_df = None interactions_df = None vectorizer = None recipe_vectors = None device = "cuda" if torch.cuda.is_available() else "cpu" # Request/Response Models class RecipeRequest(BaseModel): ingredients: str preferences: Optional[str] = "" max_minutes: int = 30 # Conversation intelligence fields user_id: Optional[str] = None session_id: Optional[str] = None conversation_context: Optional[dict] = None user_preferences: Optional[dict] = None # Personalization fields liked_recipe_ids: List[int] = [] disliked_recipe_ids: List[int] = [] dietary_restrictions: List[str] = [] preferred_cuisines: List[str] = [] class NutritionRequest(BaseModel): query: str user_id: Optional[str] = None previous_queries: List[str] = [] class ChatbotOptionRequest(BaseModel): user_input: str user_id: Optional[str] = None session_id: Optional[str] = None class UserFeedbackRequest(BaseModel): user_id: str recipe_id: int feedback_type: str # "like", "dislike", "save" interaction_context: Optional[dict] = None class DatabaseRecipe(BaseModel): id: int name: str description: str ingredients: List[str] steps: List[str] minutes: int servings: Optional[int] = None nutrition: Optional[dict] = None tags: List[str] = [] confidence: float class RecipeResponse(BaseModel): status: str recommendations: List[DatabaseRecipe] query: RecipeRequest error: Optional[str] = None class NutritionResponse(BaseModel): status: str topic: str summary: str key_points: List[str] trusted_sources: List[dict] error: Optional[str] = None class ChatbotOptionResponse(BaseModel): status: str response_type: str # "options", "nutrition", "recipe" message: str options: Optional[List[str]] = None nutrition_info: Optional[dict] = None recipes: Optional[List[DatabaseRecipe]] = None error: Optional[str] = None class UserFeedbackResponse(BaseModel): status: str message: str updated_preferences: Optional[dict] = None error: Optional[str] = None def safe_eval_list(x): """Safely parse string representations of lists""" if isinstance(x, list): return x if isinstance(x, str): try: # Try to evaluate as Python literal result = ast.literal_eval(x) if isinstance(result, list): return [str(item) for item in result] except (ValueError, SyntaxError): # Fall back to simple string splitting return [item.strip() for item in x.split(',') if item.strip()] return [] def filter_by_ratings(recipes_df, interactions_df, min_rating=4.0, min_reviews=5): """Filter recipes to only include those with good ratings""" try: print(f"📊 Processing {len(interactions_df)} interactions for rating filter...") # Calculate average rating and review count for each recipe recipe_stats = interactions_df.groupby('recipe_id').agg({ 'rating': ['mean', 'count'], 'review': lambda x: x.dropna().apply(lambda review: len(str(review)) > 10).sum() # Count meaningful reviews }).reset_index() # Flatten column names recipe_stats.columns = ['recipe_id', 'avg_rating', 'rating_count', 'meaningful_reviews'] # Filter for high-quality recipes high_quality = recipe_stats[ (recipe_stats['avg_rating'] >= min_rating) & (recipe_stats['rating_count'] >= min_reviews) ] print(f"🏆 Found {len(high_quality)} recipes with rating >= {min_rating} and >= {min_reviews} reviews") # Join with recipes and keep only high-quality ones filtered_recipes = recipes_df.merge( high_quality[['recipe_id', 'avg_rating', 'rating_count']], left_on='id', right_on='recipe_id', how='inner' ) # Add rating info to the dataframe filtered_recipes['avg_rating'] = filtered_recipes['avg_rating'].round(1) print(f"✅ Quality filter complete: {len(filtered_recipes)} highly-rated recipes") return filtered_recipes except Exception as e: print(f"⚠️ Rating filter failed: {e}") raise Exception(f"Failed to apply rating filter: {e}") def load_recipes(): """Load and process both RAW_recipes.csv and RAW_interactions.csv with rating filtering""" global recipes_df, interactions_df, vectorizer, recipe_vectors try: # Try to load from Hugging Face dataset directly print("📊 Attempting to load recipe dataset from Hugging Face...") try: # Method 1: Try with datasets library try: from datasets import load_dataset print("🔄 Loading from nutrientartcd/recipe-dataset...") dataset = load_dataset("nutrientartcd/recipe-dataset") # The dataset might not have splits, so try different approaches if hasattr(dataset, 'to_pandas'): df = dataset.to_pandas() elif 'train' in dataset: df = dataset['train'].to_pandas() else: # Get the first available split split_name = list(dataset.keys())[0] df = dataset[split_name].to_pandas() print(f"✅ Successfully loaded {len(df)} recipes from Hugging Face datasets!") except Exception as datasets_error: print(f"⚠️ Datasets library failed: {datasets_error}") # Method 2: Direct CSV download from Hugging Face print("🔄 Trying direct CSV download from Hugging Face...") import urllib.request csv_url = "https://huggingface.co/datasets/nutrientartcd/recipe-dataset/resolve/main/RAW_recipes.csv" local_csv = "/tmp/RAW_recipes_downloaded.csv" print(f"Downloading from: {csv_url}") urllib.request.urlretrieve(csv_url, local_csv) df = pd.read_csv(local_csv) print(f"✅ Successfully downloaded and loaded {len(df)} recipes from CSV!") # Also download interactions CSV for rating filtering interactions_url = "https://huggingface.co/datasets/nutrientartcd/recipe-dataset/resolve/main/RAW_interactions.csv" local_interactions = "/tmp/RAW_interactions_downloaded.csv" print("📊 Downloading interactions data for rating filtering...") urllib.request.urlretrieve(interactions_url, local_interactions) interactions_df = pd.read_csv(local_interactions) print(f"✅ Loaded {len(interactions_df)} interactions for rating filtering!") except Exception as hf_error: print(f"⚠️ Both Hugging Face methods failed: {hf_error}") # Try local paths as fallback print("🔄 Trying local CSV files...") possible_paths = [ "RAW_recipes.csv", "/tmp/RAW_recipes.csv", "./RAW_recipes.csv", "../RAW_recipes.csv", "/app/RAW_recipes.csv", "recipe_data/RAW_recipes.csv" ] dataset_path = None for path in possible_paths: if os.path.exists(path): dataset_path = path break if dataset_path is None: print("❌ No local CSV files found either") print("📂 Current working directory:", os.getcwd()) print("📋 Available files:", [f for f in os.listdir('.') if f.endswith('.csv')][:10]) raise FileNotFoundError("Neither Hugging Face dataset nor local CSV found") print(f"📊 Loading recipes from local file {dataset_path}...") df = pd.read_csv(dataset_path) # Clean and process the dataframe required_cols = ['id', 'name', 'minutes', 'ingredients', 'steps'] missing_cols = [col for col in required_cols if col not in df.columns] if missing_cols: raise ValueError(f"Missing required columns: {missing_cols}") # Filter recipes based on ratings from interactions if interactions_df is not None: df = filter_by_ratings(df, interactions_df) print(f"📈 After rating filter: {len(df)} high-quality recipes remaining") # Parse string lists df['ingredients'] = df['ingredients'].apply(safe_eval_list) df['steps'] = df['steps'].apply(safe_eval_list) df['tags'] = df.get('tags', '[]').apply(safe_eval_list) df['nutrition'] = df.get('nutrition', '[]').apply(safe_eval_list) # Clean data df = df[ (df['name'].str.len() > 1) & (df['minutes'] > 0) & (df['ingredients'].str.len() > 0) & (df['steps'].str.len() > 0) ].copy() # Create searchable text fields df['ingredients_text'] = df['ingredients'].apply(lambda x: ' '.join(x).lower()) df['steps_text'] = df['steps'].apply(lambda x: ' '.join(x).lower()) df['tags_text'] = df['tags'].apply(lambda x: ' '.join(x).lower()) df['search_text'] = ( df['name'].str.lower() + ' ' + df['ingredients_text'] + ' ' + df['tags_text'] + ' ' + df.get('description', '').fillna('').str.lower() ) # Create TF-IDF vectors for semantic search print("🔍 Building search index...") vectorizer = TfidfVectorizer( max_features=5000, stop_words='english', ngram_range=(1, 2), min_df=2 ) recipe_vectors = vectorizer.fit_transform(df['search_text']) recipes_df = df print(f"✅ Loaded {len(df)} recipes successfully!") except Exception as e: print(f"❌ Error loading recipes: {e}") print(f"📍 Error details: {type(e).__name__}: {str(e)}") raise Exception(f"Failed to load recipe database: {e}") async def get_usda_food_suggestions(query_text, limit=5): """Use USDA FoodData Central API to intelligently understand food terms""" try: # Clean the query to extract potential food terms food_words = [word for word in query_text.lower().split() if word not in ['i', 'want', 'recipe', 'recipes', 'for', 'the', 'a', 'an']] if not food_words: return [] # Search USDA database for food items search_term = ' '.join(food_words[:2]) # Use first 2 meaningful words url = "https://api.nal.usda.gov/fdc/v1/foods/search" params = { 'query': search_term, 'dataType': ['Foundation', 'SR Legacy'], # Most comprehensive data 'pageSize': limit, 'api_key': 'DEMO_KEY' # Free demo key, works for testing } async with aiohttp.ClientSession() as session: async with session.get(url, params=params) as response: if response.status == 200: data = await response.json() food_suggestions = [] for food in data.get('foods', []): description = food.get('description', '').lower() # Extract meaningful food terms from USDA descriptions if description: food_suggestions.append(description) print(f"🥗 USDA found: {food_suggestions[:3]}") return food_suggestions[:3] # Return top 3 matches else: print(f"⚠️ USDA API error: {response.status}") return [] except Exception as e: print(f"⚠️ USDA API failed: {e}") return [] @torch.inference_mode() async def extract_query_features_with_llm(query_text, preferences="", max_minutes=30): """Use USDA API + DialoGPT for truly intelligent food understanding""" global tokenizer, model full_query = f"{query_text} {preferences}".strip() # Start with the original query base_search_terms = [full_query] # Get intelligent food suggestions from USDA usda_suggestions = await get_usda_food_suggestions(query_text) # If DialoGPT is available, use it for context enhancement llm_enhanced_terms = [] if model is not None and tokenizer is not None: try: conversation = f"User: I want to cook {query_text}".strip() inputs = tokenizer.encode(conversation + tokenizer.eos_token, return_tensors="pt").to(device) outputs = model.generate( inputs, max_new_tokens=20, temperature=0.7, top_p=0.9, do_sample=True, pad_token_id=tokenizer.pad_token_id, repetition_penalty=1.2 ) response = tokenizer.decode(outputs[0][inputs.shape[1]:], skip_special_tokens=True) # Only extract actual food/cooking terms for word in response.split(): word_clean = word.lower().strip('.,!?') if len(word_clean) > 3 and word_clean not in ['that', 'have', 'with', 'this', 'your', 'they', 'them']: llm_enhanced_terms.append(word_clean) llm_enhanced_terms = llm_enhanced_terms[:2] # Limit to 2 terms except Exception as e: print(f"⚠️ DialoGPT failed: {e}") # Combine all intelligent suggestions all_search_terms = base_search_terms + usda_suggestions + llm_enhanced_terms print(f"🧠 Smart search terms: {all_search_terms[:5]}") return { 'original_query': full_query, 'search_terms': all_search_terms, 'max_minutes': max_minutes, 'usda_enhanced': len(usda_suggestions) > 0, 'llm_enhanced': len(llm_enhanced_terms) > 0 } def parse_llm_json_response(response_text): """Parse LLM's JSON response into structured features""" try: # Clean the response - remove any non-JSON text response_text = response_text.strip() # Find JSON content between braces start_idx = response_text.find('{') end_idx = response_text.rfind('}') + 1 if start_idx == -1 or end_idx == 0: raise ValueError("No JSON found in response") json_text = response_text[start_idx:end_idx] # Parse JSON features = json.loads(json_text) # Ensure all expected keys exist with default empty lists default_features = { 'ingredients': [], 'meal_types': [], 'cuisines': [], 'dietary_restrictions': [], 'cooking_styles': [], 'cooking_methods': [], 'flavors': [] } # Merge with defaults for key in default_features: if key not in features: features[key] = [] elif not isinstance(features[key], list): features[key] = [str(features[key])] return features except Exception as e: print(f"⚠️ JSON parsing failed: {e}") print(f"Response text: {response_text[:200]}...") # Fallback: extract key terms manually text_lower = response_text.lower() return { 'ingredients': extract_terms_from_text(text_lower, ['chocolate', 'vanilla', 'sugar', 'flour', 'butter', 'eggs', 'milk']), 'meal_types': extract_terms_from_text(text_lower, ['dessert', 'breakfast', 'lunch', 'dinner', 'snack']), 'cuisines': extract_terms_from_text(text_lower, ['italian', 'mexican', 'asian', 'french']), 'dietary_restrictions': extract_terms_from_text(text_lower, ['vegetarian', 'vegan', 'gluten-free']), 'cooking_styles': extract_terms_from_text(text_lower, ['quick', 'easy', 'healthy']), 'cooking_methods': extract_terms_from_text(text_lower, ['baked', 'fried', 'grilled']), 'flavors': extract_terms_from_text(text_lower, ['sweet', 'savory', 'spicy']) } def extract_terms_from_text(text, terms_list): """Helper function to extract terms from text""" return [term for term in terms_list if term in text] def apply_personalization_filters(df, request_data): """Apply personalization filters based on user preferences and history""" filtered_df = df.copy() # Filter out disliked recipes if request_data.disliked_recipe_ids: filtered_df = filtered_df[~filtered_df['id'].isin(request_data.disliked_recipe_ids)] print(f"🚫 Filtered out {len(request_data.disliked_recipe_ids)} disliked recipes") # Apply dietary restrictions if request_data.dietary_restrictions: for restriction in request_data.dietary_restrictions: if restriction.lower() == "vegetarian": # Filter out meat-based recipes meat_keywords = ['beef', 'chicken', 'pork', 'lamb', 'fish', 'salmon', 'tuna'] for keyword in meat_keywords: filtered_df = filtered_df[~filtered_df['ingredients_text'].str.contains(keyword, case=False, na=False)] elif restriction.lower() == "vegan": # Filter out animal products animal_keywords = ['beef', 'chicken', 'pork', 'lamb', 'fish', 'milk', 'cheese', 'butter', 'egg', 'cream'] for keyword in animal_keywords: filtered_df = filtered_df[~filtered_df['ingredients_text'].str.contains(keyword, case=False, na=False)] elif restriction.lower() == "gluten-free": # Filter out gluten-containing ingredients gluten_keywords = ['flour', 'wheat', 'bread', 'pasta', 'noodles'] for keyword in gluten_keywords: filtered_df = filtered_df[~filtered_df['ingredients_text'].str.contains(keyword, case=False, na=False)] return filtered_df def apply_personalization_ranking(df, request_data): """Apply personalization ranking boosts based on user preferences""" if df.empty or not request_data: return df # Boost recipes from preferred cuisines if request_data.preferred_cuisines: for cuisine in request_data.preferred_cuisines: cuisine_mask = ( df['name'].str.lower().str.contains(cuisine.lower(), na=False) | df['tags_text'].str.contains(cuisine.lower(), na=False) | df['search_text'].str.contains(cuisine.lower(), na=False) ) df.loc[cuisine_mask, 'similarity'] *= 1.5 # Boost recipes similar to liked ones (simplified - in production use embedding similarity) if request_data.liked_recipe_ids: # This is a simplified approach - in production you'd use recipe embeddings boost_factor = 1.3 print(f"🎯 Applied personalization boosts for {len(request_data.liked_recipe_ids)} liked recipes") return df def search_recipes(query_features, request_data=None, top_k=10): """Enhanced intelligent search with personalization and conversation context""" global recipes_df, vectorizer, recipe_vectors if recipes_df is None: load_recipes() # Filter by time constraint filtered_df = recipes_df[recipes_df['minutes'] <= query_features['max_minutes']].copy() if len(filtered_df) == 0: filtered_df = recipes_df.copy() # Fall back to all recipes # Apply personalization filters if available if request_data: filtered_df = apply_personalization_filters(filtered_df, request_data) # Create search query from all terms (original query + DialoGPT enhancements) search_query = ' '.join(query_features['search_terms']) if search_query and vectorizer is not None: # Semantic search using TF-IDF on the full query query_vector = vectorizer.transform([search_query]) # Get vectors for the filtered subset by re-indexing filtered_indices = filtered_df.index.tolist() try: # Make sure indices are within bounds valid_indices = [i for i in filtered_indices if i < recipe_vectors.shape[0]] if valid_indices: filtered_vectors = recipe_vectors[valid_indices] similarities = cosine_similarity(query_vector, filtered_vectors).flatten() # Update filtered_df to only include valid indices filtered_df = filtered_df.loc[valid_indices] else: # No valid indices, fall back to random selection similarities = np.array([0.5] * len(filtered_df)) except Exception as e: print(f"⚠️ Vector indexing error: {e}, falling back to random") similarities = np.array([0.5] * len(filtered_df)) # Add similarity scores (ensure lengths match) filtered_df = filtered_df.copy() if len(similarities) == len(filtered_df): filtered_df['similarity'] = similarities else: print(f"⚠️ Similarity length mismatch: {len(similarities)} vs {len(filtered_df)}") filtered_df['similarity'] = 0.5 # Simple boosting based on query content detection original_query = query_features.get('original_query', '').lower() # Boost for dessert-related queries if any(word in original_query for word in ['dessert', 'sweet', 'chocolate', 'cake', 'cookie']): dessert_patterns = ['chocolate', 'cake', 'cookie', 'dessert', 'sweet', 'brownie', 'pie'] for pattern in dessert_patterns: mask = (filtered_df['name'].str.lower().str.contains(pattern, na=False) | filtered_df['search_text'].str.contains(pattern, na=False)) filtered_df.loc[mask, 'similarity'] *= 2.0 # Boost for specific food mentions (burger, pasta, etc.) food_words = [word for word in original_query.split() if len(word) > 3] for word in food_words: if word not in ['want', 'like', 'something', 'recipes', 'recipe']: mask = (filtered_df['name'].str.lower().str.contains(word, na=False) | filtered_df['ingredients_text'].str.contains(word, na=False) | filtered_df['search_text'].str.contains(word, na=False)) filtered_df.loc[mask, 'similarity'] *= 1.5 # Apply personalization ranking if request data available if request_data: filtered_df = apply_personalization_ranking(filtered_df, request_data) # Sort by similarity (descending) filtered_df = filtered_df.sort_values('similarity', ascending=False) # Log the top results for debugging print(f"🔍 Search results for '{search_query}':") for i, (_, recipe) in enumerate(filtered_df.head(3).iterrows()): print(f" {i+1}. {recipe['name']} (sim: {recipe['similarity']:.3f})") else: # Fallback: random selection filtered_df = filtered_df.sample(min(len(filtered_df), top_k*2), random_state=42) filtered_df['similarity'] = 0.5 return filtered_df.head(top_k) # New enhanced chatbot endpoint - option selection @app.post("/api/chatbot-options", response_model=ChatbotOptionResponse) async def chatbot_options(request: ChatbotOptionRequest): """ Enhanced chatbot that gives users option between nutrition recommendations and recipes """ try: user_input = request.user_input.lower().strip() # Check if user is asking for specific type of help if any(word in user_input for word in ["nutrition", "healthy", "vitamin", "mineral", "diet", "health"]): return ChatbotOptionResponse( status="success", response_type="nutrition", message="I can help you with nutrition information! What specific topic would you like to learn about?", options=["Vitamins & Minerals", "Heart Health", "Weight Management", "Diabetes Nutrition", "General Nutrition Tips"] ) elif any(word in user_input for word in ["recipe", "cook", "meal", "food", "ingredients"]): return ChatbotOptionResponse( status="success", response_type="recipe", message="I can help you find recipes! Tell me what ingredients you have or what type of meal you'd like.", options=["Quick Meals (15-30 min)", "Healthy Options", "Comfort Food", "Vegetarian", "Use My Ingredients"] ) else: # Initial greeting - present both options return ChatbotOptionResponse( status="success", response_type="options", message="Hello! I'm your nutrition and recipe assistant. How can I help you today?", options=["🍎 Get nutrition recommendations", "🍳 Find recipe recommendations"] ) except Exception as e: return ChatbotOptionResponse( status="error", response_type="options", message="Sorry, I encountered an error. Please try again.", error=str(e) ) # Nutrition information endpoint @app.post("/api/nutrition-info", response_model=NutritionResponse) async def get_nutrition_info(request: NutritionRequest): """ Provides nutritional recommendations with trustworthy sources """ try: query = request.query.lower().strip() # Generate nutrition response using intelligent web scraping nutrition_info = await generate_intelligent_nutrition_response(query) return NutritionResponse( status="success", topic=nutrition_info["topic"], summary=nutrition_info["summary"], key_points=nutrition_info["key_points"], trusted_sources=nutrition_info["sources"] ) except Exception as e: return NutritionResponse( status="error", topic="Error", summary="Failed to retrieve nutrition information", key_points=[], trusted_sources=[], error=str(e) ) # User feedback endpoint for reinforcement learning @app.post("/api/user-feedback", response_model=UserFeedbackResponse) async def record_user_feedback(request: UserFeedbackRequest): """ Records user feedback for reinforcement learning improvements """ try: # In a real implementation, this would store feedback in a database # For now, we'll log it and return success print(f"📊 User feedback: User {request.user_id} {request.feedback_type} recipe {request.recipe_id}") # Here you would typically: # 1. Store the feedback in a database # 2. Update user preference models # 3. Trigger retraining of recommendation models return UserFeedbackResponse( status="success", message=f"Thank you for your feedback! Your {request.feedback_type} has been recorded.", updated_preferences={"learning": True} ) except Exception as e: return UserFeedbackResponse( status="error", message="Failed to record feedback", error=str(e) ) # Web scraping and content extraction class WebScraper: def __init__(self): self.headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' } self.cache = {} # Simple in-memory cache self.cache_duration = 3600 # 1 hour cache async def scrape_url(self, url: str) -> dict: """Scrape content from a single URL""" try: # Check cache first cache_key = url if cache_key in self.cache: cached_data, timestamp = self.cache[cache_key] if time.time() - timestamp < self.cache_duration: return cached_data async with aiohttp.ClientSession() as session: async with session.get(url, headers=self.headers, timeout=10) as response: if response.status == 200: html = await response.text() soup = BeautifulSoup(html, 'html.parser') # Extract meaningful content content = self.extract_content(soup, url) # Cache the result self.cache[cache_key] = (content, time.time()) return content else: return {"error": f"HTTP {response.status}"} except Exception as e: return {"error": str(e)} def extract_content(self, soup: BeautifulSoup, url: str) -> dict: """Extract meaningful content from BeautifulSoup object""" # Remove script and style elements for script in soup(["script", "style", "nav", "footer", "header"]): script.decompose() # Try to find the main content area main_content = soup.find('main') or soup.find('article') or soup.find('div', class_=re.compile(r'content|main|article')) if not main_content: main_content = soup.find('body') # Extract title title = "" title_tag = soup.find('title') if title_tag: title = title_tag.get_text().strip() # Extract headings and paragraphs headings = [] paragraphs = [] if main_content: # Get headings (h1, h2, h3) for heading in main_content.find_all(['h1', 'h2', 'h3']): heading_text = heading.get_text().strip() if heading_text and len(heading_text) < 200: headings.append(heading_text) # Get paragraphs for p in main_content.find_all('p'): p_text = p.get_text().strip() if p_text and len(p_text) > 50: # Filter out short paragraphs paragraphs.append(p_text) # Extract lists (ul, ol) lists = [] if main_content: for ul in main_content.find_all(['ul', 'ol']): list_items = [] for li in ul.find_all('li'): li_text = li.get_text().strip() if li_text and len(li_text) < 300: list_items.append(li_text) if list_items: lists.append(list_items) return { "title": title, "url": url, "domain": urlparse(url).netloc, "headings": headings[:10], # Limit to first 10 headings "paragraphs": paragraphs[:15], # Limit to first 15 paragraphs "lists": lists[:5], # Limit to first 5 lists "scraped_at": time.time() } async def scrape_multiple_urls(self, urls: list) -> list: """Scrape multiple URLs concurrently""" tasks = [self.scrape_url(url) for url in urls] results = await asyncio.gather(*tasks, return_exceptions=True) # Filter out exceptions and errors valid_results = [] for result in results: if isinstance(result, dict) and "error" not in result: valid_results.append(result) return valid_results # Initialize scraper web_scraper = WebScraper() def get_trusted_urls_for_query(query: str) -> list: """Get relevant trusted URLs based on the query""" query_lower = query.lower() urls = [] # Weight loss / management if any(phrase in query_lower for phrase in ["lose weight", "weight loss", "weight management"]): urls.extend([ "https://www.cdc.gov/healthyweight/losing_weight/index.html", "https://www.niddk.nih.gov/health-information/weight-management/choosing-a-safe-successful-weight-loss-program", "https://www.mayoclinic.org/healthy-lifestyle/weight-loss/basics/weightloss-basics/hlv-20049483" ]) # Heart health elif any(phrase in query_lower for phrase in ["heart", "cardiovascular", "cholesterol"]): urls.extend([ "https://www.heart.org/en/healthy-living/healthy-eating/eat-smart/nutrition-basics", "https://www.nhlbi.nih.gov/education/dash-eating-plan", "https://www.mayoclinic.org/diseases-conditions/heart-disease/in-depth/heart-healthy-diet/art-20047702" ]) # Diabetes elif any(phrase in query_lower for phrase in ["diabetes", "blood sugar"]): urls.extend([ "https://www.cdc.gov/diabetes/managing/eat-well.html", "https://www.niddk.nih.gov/health-information/diabetes/overview/diet-eating-physical-activity", "https://diabetes.org/food-nutrition" ]) # Vitamins and supplements elif any(word in query_lower for word in ["vitamin", "supplement", "mineral"]): urls.extend([ "https://ods.od.nih.gov/factsheets/list-all/", "https://www.nutrition.gov/topics/dietary-supplements", "https://www.mayoclinic.org/healthy-lifestyle/nutrition-and-healthy-eating/in-depth/supplements/art-20044894" ]) # General nutrition else: urls.extend([ "https://www.nutrition.gov/topics/basic-nutrition", "https://www.cdc.gov/nutrition/guidelines.html", "https://www.choosemyplate.gov/" ]) return urls[:3] # Limit to 3 URLs to avoid overwhelming the system async def generate_intelligent_nutrition_response(query: str) -> dict: """Generate nutrition response by scraping and summarizing trusted sources""" # Get relevant URLs trusted_urls = get_trusted_urls_for_query(query) # Scrape the URLs scraped_data = await web_scraper.scrape_multiple_urls(trusted_urls) if not scraped_data: # Fallback to static response if scraping fails return generate_static_nutrition_response(query) # Combine and summarize the scraped content combined_content = "" sources = [] for data in scraped_data: # Add to sources sources.append({ "title": data["title"], "url": data["url"], "domain": data["domain"], "credibility_score": get_credibility_score(data["domain"]) }) # Combine content for summarization content_parts = [] content_parts.extend(data["headings"]) content_parts.extend(data["paragraphs"][:5]) # First 5 paragraphs # Add list items for list_items in data["lists"]: content_parts.extend(list_items[:3]) # First 3 items from each list combined_content += " ".join(content_parts) + " " # Generate summary using the scraped content summary, key_points = summarize_nutrition_content(combined_content, query) # Determine topic from query topic = determine_nutrition_topic(query) return { "topic": topic, "summary": summary, "key_points": key_points, "sources": sources, "scraped_from": len(scraped_data), "query_analyzed": query } def get_credibility_score(domain: str) -> float: """Get credibility score for a domain""" scores = { "cdc.gov": 0.95, "nih.gov": 0.98, "niddk.nih.gov": 0.98, "nutrition.gov": 0.95, "mayoclinic.org": 0.90, "heart.org": 0.92, "diabetes.org": 0.93, "choosemyplate.gov": 0.90, "nhlbi.nih.gov": 0.95, "ods.od.nih.gov": 0.98 } return scores.get(domain, 0.75) def summarize_nutrition_content(content: str, query: str) -> tuple: """Summarize nutrition content and extract key points""" # Clean the content content = re.sub(r'\s+', ' ', content) # Remove extra whitespace content = content[:3000] # Limit content length # Use simple summarization for now (could use LLM later) sentences = content.split('.') # Find most relevant sentences based on query keywords query_words = query.lower().split() relevant_sentences = [] for sentence in sentences: sentence = sentence.strip() if len(sentence) > 20: # Score sentence based on query word matches score = sum(1 for word in query_words if word in sentence.lower()) if score > 0: relevant_sentences.append((score, sentence)) # Sort by relevance and take top sentences relevant_sentences.sort(key=lambda x: x[0], reverse=True) # Create summary from top 3 relevant sentences summary_sentences = [sent[1] for sent in relevant_sentences[:3]] summary = ". ".join(summary_sentences) if not summary: summary = "Evidence-based nutrition information from trusted health organizations." # Extract key points (look for list-like content) key_points = [] for sentence in sentences: sentence = sentence.strip() if any(starter in sentence.lower() for starter in ["eat ", "choose ", "limit ", "include ", "avoid ", "consume "]): if len(sentence) > 20 and len(sentence) < 150: key_points.append(sentence.capitalize()) # Ensure we have at least 4 key points if len(key_points) < 4: key_points.extend([ "Eat a variety of nutrient-dense foods from all food groups", "Practice portion control and mindful eating", "Stay hydrated with water as your primary beverage", "Consult healthcare professionals for personalized advice" ]) return summary[:500], key_points[:6] # Limit summary and key points def determine_nutrition_topic(query: str) -> str: """Determine the main nutrition topic from the query""" query_lower = query.lower() if any(phrase in query_lower for phrase in ["lose weight", "weight loss"]): return "Weight Loss Nutrition" elif any(phrase in query_lower for phrase in ["gain weight", "build muscle"]): return "Healthy Weight Gain" elif any(phrase in query_lower for phrase in ["heart", "cardiovascular"]): return "Heart-Healthy Nutrition" elif any(phrase in query_lower for phrase in ["diabetes", "blood sugar"]): return "Diabetes Nutrition Management" elif any(word in query_lower for word in ["vitamin", "supplement"]): return "Vitamins and Supplements" else: return "General Nutrition Guidelines" def generate_static_nutrition_response(query: str) -> dict: """Fallback static response when scraping fails""" # Your existing static response logic here return { "topic": "General Nutrition", "summary": "Unable to fetch current information. Please try again later.", "key_points": ["Consult healthcare professionals for nutrition advice"], "sources": [] } def generate_nutrition_response(query: str) -> dict: """ Legacy sync wrapper for async function """ import asyncio loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: return loop.run_until_complete(generate_intelligent_nutrition_response(query)) finally: loop.close() # Load model on startup @app.on_event("startup") async def load_model(): global tokenizer, model try: print("🚀 Loading DialoGPT for Recipe Intelligence...") # Use DialoGPT-small - lightweight and great for conversational understanding model_name = "microsoft/DialoGPT-small" # Load tokenizer print("📚 Loading DialoGPT tokenizer...") tokenizer = AutoTokenizer.from_pretrained(model_name) if tokenizer.pad_token is None: tokenizer.pad_token = tokenizer.eos_token # Load model - much lighter than Llama 2 print("🤖 Loading DialoGPT model (optimized for HF Spaces)...") model = AutoModelForCausalLM.from_pretrained( model_name, torch_dtype=torch.float16 if device == "cuda" else torch.float32, low_cpu_mem_usage=True ).to(device) model.eval() print(f"✅ DialoGPT model loaded successfully on {device}!") # Load recipe database load_recipes() except Exception as e: print(f"❌ Error loading DialoGPT model: {e}") print("Falling back to enhanced rule-based processing...") # Don't fail completely - we can still work with enhanced rule-based extraction tokenizer = None model = None load_recipes() # Health check endpoint @app.get("/") async def root(): if recipes_df is None: load_recipes() return { "message": "🍳 Recipe AI Assistant API v2.0", "status": "healthy", "model_loaded": model is not None, "recipes_loaded": recipes_df is not None, "recipe_count": len(recipes_df) if recipes_df is not None else 0, "device": device, "current_directory": os.getcwd(), "available_files": [f for f in os.listdir('.') if f.endswith('.csv')][:5] } # Debug endpoint to check recipe database content @app.get("/debug/search/{query}") async def debug_search_recipes(query: str): """Debug endpoint to check if specific terms exist in recipe database""" if recipes_df is None: load_recipes() query_lower = query.lower() # Search in recipe names name_matches = recipes_df[recipes_df['name'].str.lower().str.contains(query_lower, na=False)] # Search in ingredients ingredient_matches = recipes_df[recipes_df['ingredients_text'].str.contains(query_lower, na=False)] # Search in all searchable text full_text_matches = recipes_df[recipes_df['search_text'].str.contains(query_lower, na=False)] return { "query": query, "total_recipes": len(recipes_df), "name_matches": { "count": len(name_matches), "examples": name_matches['name'].head(5).tolist() if len(name_matches) > 0 else [] }, "ingredient_matches": { "count": len(ingredient_matches), "examples": ingredient_matches['name'].head(5).tolist() if len(ingredient_matches) > 0 else [] }, "full_text_matches": { "count": len(full_text_matches), "examples": full_text_matches['name'].head(5).tolist() if len(full_text_matches) > 0 else [] } } # Health check endpoint @app.get("/health") async def health_check(): return { "status": "healthy", "model_status": "loaded" if model is not None else "not_loaded", "recipes_status": "loaded" if recipes_df is not None else "not_loaded", "recipe_count": len(recipes_df) if recipes_df is not None else 0, "device": device } # Main recipe recommendation endpoint @app.post("/api/recipe-suggestions", response_model=RecipeResponse) async def get_recipe_suggestions(request: RecipeRequest): try: if recipes_df is None: load_recipes() print(f"📥 Recipe request: {request.ingredients}, prefs: {request.preferences}, time: {request.max_minutes}") # Use USDA API + LLM for intelligent feature extraction query_features = await extract_query_features_with_llm( request.ingredients, request.preferences, request.max_minutes ) # Search for matching recipes with personalization matching_recipes = search_recipes(query_features, request_data=request, top_k=5) # Convert to response format recommendations = [] for _, recipe in matching_recipes.iterrows(): # Parse nutrition if available nutrition = None if isinstance(recipe.get('nutrition'), list) and len(recipe['nutrition']) > 0: try: if isinstance(recipe['nutrition'][0], str): nutrition_list = ast.literal_eval(recipe['nutrition'][0]) else: nutrition_list = recipe['nutrition'] if len(nutrition_list) >= 7: # Ensure we have enough nutrition values nutrition = { "calories": float(nutrition_list[0]) if nutrition_list[0] else 0, "fat": float(nutrition_list[1]) if nutrition_list[1] else 0, "sugar": float(nutrition_list[2]) if nutrition_list[2] else 0, "sodium": float(nutrition_list[3]) if nutrition_list[3] else 0, "protein": float(nutrition_list[4]) if nutrition_list[4] else 0, "saturated_fat": float(nutrition_list[5]) if nutrition_list[5] else 0, "carbs": float(nutrition_list[6]) if nutrition_list[6] else 0 } except: nutrition = None # Clean the data to handle NaN values clean_description = recipe.get('description', '') if pd.isna(clean_description) or clean_description is None: clean_description = '' clean_name = recipe.get('name', 'Untitled Recipe') if pd.isna(clean_name): clean_name = 'Untitled Recipe' # Ensure minutes is valid recipe_minutes = recipe.get('minutes', 30) if pd.isna(recipe_minutes) or recipe_minutes <= 0: recipe_minutes = 30 # Use avg_rating as confidence (normalized to 0-1 scale for 5-star display) # If avg_rating exists, use it; otherwise fallback to similarity or 0.8 for high-quality recipes recipe_confidence = float(recipe.get('avg_rating', 4.5)) / 5.0 # Convert 4-5 star rating to 0.8-1.0 scale db_recipe = DatabaseRecipe( id=int(recipe['id']), name=str(clean_name), description=str(clean_description), ingredients=recipe['ingredients'], steps=recipe['steps'], minutes=int(recipe_minutes), servings=recipe.get('servings', recipe.get('n_ingredients', 4)), nutrition=nutrition, tags=recipe['tags'], confidence=recipe_confidence ) recommendations.append(db_recipe) return RecipeResponse( status="success", recommendations=recommendations, query=request ) except Exception as e: print(f"❌ Error generating recommendations: {e}") raise HTTPException(status_code=500, detail=str(e)) if __name__ == "__main__": port = int(os.environ.get("PORT", 7860)) uvicorn.run( "app:app", host="0.0.0.0", port=port, reload=False )