Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Group 5 Pattern Recognition Project - Deployment Version | |
| ======================================================= | |
| Recipe Recommendation System with Google Drive file loading for deployment. | |
| Optimized for Hugging Face Spaces or similar platforms. | |
| """ | |
| import gradio as gr | |
| import torch | |
| from transformers import BertTokenizer, BertModel | |
| import pickle | |
| import os | |
| import csv | |
| from typing import List, Dict | |
| import time | |
| import ast | |
| import requests | |
| import gdown | |
| from pathlib import Path | |
| # Google Drive file IDs (you'll need to replace these with your actual file IDs) | |
| GOOGLE_DRIVE_FILES = { | |
| 'torch_recipe_embeddings_231630.pt': '1PSidY1toSfgECXDxa4pGza56Jq6vOq6t', | |
| 'tag_based_bert_model.pth': '1LBl7yFs5JFqOsgfn88BF9g83W9mxiBm6', | |
| 'RAW_recipes.csv': '1rFJQzg_ErwEpN6WmhQ4jRyiXv6JCINyf', | |
| 'recipe_statistics_231630.pkl': '1n8TNT-6EA_usv59CCCU1IXqtuM7i084E', | |
| 'recipe_scores_231630.pkl': '1gfPBzghKHOZqgJu4VE9NkandAd6FGjrA' | |
| } | |
| def download_file_from_drive(file_id: str, destination: str) -> bool: | |
| """Download file from Google Drive""" | |
| try: | |
| print(f"π₯ Downloading {destination}...") | |
| url = f"https://drive.google.com/uc?id={file_id}" | |
| gdown.download(url, destination, quiet=False) | |
| return True | |
| except Exception as e: | |
| print(f"β Error downloading {destination}: {e}") | |
| return False | |
| def ensure_files_downloaded(): | |
| """Ensure all required files are downloaded from Google Drive""" | |
| print("π Checking required files...") | |
| for filename, file_id in GOOGLE_DRIVE_FILES.items(): | |
| if not os.path.exists(filename): | |
| if file_id == 'YOUR_EMBEDDINGS_FILE_ID_HERE': | |
| print(f"β οΈ {filename} not configured for download") | |
| continue | |
| print(f"π₯ Downloading {filename} from Google Drive...") | |
| success = download_file_from_drive(file_id, filename) | |
| if not success: | |
| print(f"β Failed to download {filename}") | |
| return False | |
| print("β All files ready!") | |
| return True | |
| class DeployableRecipeSearch: | |
| """ | |
| Deployment-ready recipe search system | |
| """ | |
| def __init__(self): | |
| print("π Initializing Recipe Search System...") | |
| self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') | |
| print(f"π± Device: {self.device}") | |
| # Ensure files are downloaded | |
| if not ensure_files_downloaded(): | |
| print("β Failed to download required files") | |
| self.is_ready = False | |
| return | |
| # Load tokenizer and model | |
| self.tokenizer = BertTokenizer.from_pretrained('bert-base-uncased') | |
| self.model = BertModel.from_pretrained('bert-base-uncased') | |
| # Load trained model if available | |
| if os.path.exists('tag_based_bert_model.pth'): | |
| print("π§ Loading trained BERT model...") | |
| self.model.load_state_dict(torch.load('tag_based_bert_model.pth', map_location=self.device)) | |
| print("β Trained model loaded!") | |
| else: | |
| print("β οΈ Using pre-trained BERT") | |
| self.model.to(self.device) | |
| self.model.eval() | |
| # Load data | |
| self.load_data() | |
| print("π Recipe Search System ready!") | |
| def safe_literal_eval(self, text): | |
| """Safely evaluate string representations of lists""" | |
| if not text or text == 'nan' or str(text).lower() == 'nan': | |
| return [] | |
| try: | |
| if isinstance(text, str) and text.startswith('[') and text.endswith(']'): | |
| return ast.literal_eval(text) | |
| elif isinstance(text, str): | |
| return [item.strip() for item in text.split(',') if item.strip()] | |
| elif isinstance(text, list): | |
| return text | |
| else: | |
| return [] | |
| except: | |
| return [] | |
| def safe_int(self, value): | |
| """Safely convert value to int""" | |
| try: | |
| return int(float(value)) | |
| except: | |
| return 0 | |
| def load_data(self): | |
| """Load all required data""" | |
| # Load PyTorch embeddings | |
| embeddings_file = 'torch_recipe_embeddings_231630.pt' | |
| if os.path.exists(embeddings_file): | |
| print(f"π₯ Loading embeddings...") | |
| self.recipe_embeddings = torch.load(embeddings_file, map_location=self.device) | |
| print(f"β Loaded {self.recipe_embeddings.shape[0]} embeddings") | |
| else: | |
| print(f"β Embeddings not found") | |
| self.is_ready = False | |
| return | |
| # Load recipes from CSV | |
| self.load_recipes_from_csv() | |
| # Load statistics and scores | |
| self.load_statistics_and_scores() | |
| # Check if we have everything we need | |
| self.is_ready = all([ | |
| self.recipe_embeddings is not None, | |
| len(self.recipes) > 0, | |
| len(self.recipe_stats) > 0, | |
| len(self.recipe_scores) > 0 | |
| ]) | |
| if self.is_ready: | |
| self.fix_recipe_id_mismatches() | |
| print("π― All data loaded successfully!") | |
| else: | |
| print("β οΈ Some data missing") | |
| def load_recipes_from_csv(self): | |
| """Load and filter recipes from CSV""" | |
| print("π Loading recipes from CSV...") | |
| self.recipes = [] | |
| if os.path.exists('RAW_recipes.csv'): | |
| valid_recipes = [] | |
| with open('RAW_recipes.csv', 'r', encoding='utf-8') as file: | |
| csv_reader = csv.DictReader(file) | |
| for row_idx, row in enumerate(csv_reader): | |
| try: | |
| # Apply filtering logic | |
| name = row.get('name', '') | |
| if not name or str(name).lower().strip() in ['', 'nan', 'unknown recipe']: | |
| continue | |
| name = str(name).lower().strip() | |
| tags = self.safe_literal_eval(row.get('tags', '[]')) | |
| ingredients = self.safe_literal_eval(row.get('ingredients', '[]')) | |
| # Filter conditions | |
| if not tags or len(tags) == 0: | |
| continue | |
| if not ingredients or len(ingredients) == 0: | |
| continue | |
| if len(name) == 0 or name == 'unknown recipe': | |
| continue | |
| recipe = { | |
| 'id': int(row.get('id', row_idx)), | |
| 'name': name, | |
| 'minutes': self.safe_int(row.get('minutes', 0)), | |
| 'tags': tags, | |
| 'ingredients': ingredients, | |
| 'n_steps': self.safe_int(row.get('n_steps', 0)), | |
| 'description': str(row.get('description', '')).strip() | |
| } | |
| valid_recipes.append(recipe) | |
| if len(valid_recipes) >= 231630: | |
| break | |
| except Exception as e: | |
| continue | |
| self.recipes = valid_recipes | |
| print(f"β Loaded {len(self.recipes)} recipes") | |
| else: | |
| print("β RAW_recipes.csv not found") | |
| self.recipes = [] | |
| def load_statistics_and_scores(self): | |
| """Load recipe statistics and scores""" | |
| # Load statistics | |
| stats_file = 'recipe_statistics_231630.pkl' | |
| try: | |
| if os.path.exists(stats_file): | |
| with open(stats_file, 'rb') as f: | |
| self.recipe_stats = pickle.load(f) | |
| print(f"β Loaded statistics for {len(self.recipe_stats)} recipes") | |
| else: | |
| self.recipe_stats = {} | |
| for recipe in self.recipes: | |
| self.recipe_stats[recipe['id']] = (4.0, 10, 5) | |
| except Exception as e: | |
| print(f"β οΈ Statistics loading failed: {e}") | |
| self.recipe_stats = {} | |
| for recipe in self.recipes: | |
| self.recipe_stats[recipe['id']] = (4.0, 10, 5) | |
| # Load scores | |
| scores_file = 'recipe_scores_231630.pkl' | |
| try: | |
| if os.path.exists(scores_file): | |
| with open(scores_file, 'rb') as f: | |
| self.recipe_scores = pickle.load(f) | |
| print(f"β Loaded scores for {len(self.recipe_scores)} recipes") | |
| else: | |
| self.recipe_scores = {} | |
| for recipe in self.recipes: | |
| self.recipe_scores[recipe['id']] = 0.5 | |
| except Exception as e: | |
| print(f"β οΈ Scores loading failed: {e}") | |
| self.recipe_scores = {} | |
| for recipe in self.recipes: | |
| self.recipe_scores[recipe['id']] = 0.5 | |
| def fix_recipe_id_mismatches(self): | |
| """Filter statistics and scores to match loaded recipes""" | |
| loaded_recipe_ids = set(recipe['id'] for recipe in self.recipes) | |
| # Filter statistics | |
| original_stats_count = len(self.recipe_stats) | |
| self.recipe_stats = { | |
| recipe_id: stats for recipe_id, stats in self.recipe_stats.items() | |
| if recipe_id in loaded_recipe_ids | |
| } | |
| # Filter scores | |
| original_scores_count = len(self.recipe_scores) | |
| self.recipe_scores = { | |
| recipe_id: score for recipe_id, score in self.recipe_scores.items() | |
| if recipe_id in loaded_recipe_ids | |
| } | |
| print(f"π§ Aligned data: Stats {original_stats_count}β{len(self.recipe_stats)}, Scores {original_scores_count}β{len(self.recipe_scores)}") | |
| def search_recipes(self, query: str, num_results: int = 5, min_rating: float = 3.0) -> str: | |
| """Search for recipes and return formatted HTML results""" | |
| if not self.is_ready: | |
| return """ | |
| <div style="color: red; padding: 20px; border: 1px solid red; border-radius: 5px;"> | |
| β Search system not ready - files may still be downloading | |
| </div> | |
| """ | |
| if not query.strip(): | |
| return """ | |
| <div style="color: orange; padding: 20px; border: 1px solid orange; border-radius: 5px;"> | |
| β οΈ Please enter a search query | |
| </div> | |
| """ | |
| try: | |
| start_time = time.time() | |
| # Tokenize query | |
| inputs = self.tokenizer( | |
| query, return_tensors='pt', truncation=True, | |
| max_length=128, padding='max_length' | |
| ).to(self.device) | |
| # Get query embedding | |
| with torch.no_grad(): | |
| outputs = self.model(**inputs) | |
| query_embedding = outputs.last_hidden_state[:, 0, :].cpu().flatten() | |
| # Calculate similarities | |
| recipe_embeddings_normalized = torch.nn.functional.normalize(self.recipe_embeddings, p=2, dim=1) | |
| query_embedding_normalized = torch.nn.functional.normalize(query_embedding.unsqueeze(0), p=2, dim=1) | |
| similarities = torch.mm(recipe_embeddings_normalized, query_embedding_normalized.t()).flatten() | |
| # Get top results | |
| top_indices = torch.argsort(similarities, descending=True)[:num_results * 3] | |
| results = [] | |
| for idx in top_indices: | |
| if len(results) >= num_results: | |
| break | |
| embedding_idx = idx.item() | |
| if embedding_idx < len(self.recipes): | |
| recipe = self.recipes[embedding_idx] | |
| recipe_id = recipe['id'] | |
| if recipe_id in self.recipe_stats: | |
| avg_rating, num_ratings, unique_users = self.recipe_stats[recipe_id] | |
| if avg_rating >= min_rating: | |
| similarity_score = similarities[idx].item() | |
| popularity_score = self.recipe_scores.get(recipe_id, 0.0) | |
| combined_score = 0.7 * similarity_score + 0.3 * popularity_score | |
| results.append({ | |
| 'name': recipe['name'], | |
| 'ingredients': recipe['ingredients'][:8] if isinstance(recipe['ingredients'], list) else [], | |
| 'tags': recipe['tags'][:6] if isinstance(recipe['tags'], list) else [], | |
| 'minutes': recipe.get('minutes', 0), | |
| 'n_steps': recipe.get('n_steps', 0), | |
| 'similarity_score': similarity_score, | |
| 'popularity_score': popularity_score, | |
| 'combined_score': combined_score, | |
| 'avg_rating': avg_rating, | |
| 'num_ratings': num_ratings, | |
| 'recipe_id': recipe_id | |
| }) | |
| search_time = time.time() - start_time | |
| if results: | |
| return self.format_results(query, results, search_time) | |
| else: | |
| return f""" | |
| <div style="color: orange; padding: 20px; border: 1px solid orange; border-radius: 5px;"> | |
| π No recipes found for "{query}" with rating β₯ {min_rating} | |
| </div> | |
| """ | |
| except Exception as e: | |
| return f""" | |
| <div style="color: red; padding: 20px; border: 1px solid red; border-radius: 5px;"> | |
| β Search error: {str(e)} | |
| </div> | |
| """ | |
| def format_results(self, query: str, results: List[Dict], search_time: float) -> str: | |
| """Format search results as HTML""" | |
| html = f""" | |
| <div style="margin-bottom: 20px;"> | |
| <h2 style="color: #2E8B57;">π― Found {len(results)} recipes for "{query}"</h2> | |
| <p style="color: #666;">β‘ Search completed in {search_time:.2f}s</p> | |
| </div> | |
| """ | |
| for i, recipe in enumerate(results, 1): | |
| ingredients = recipe['ingredients'] | |
| ingredients_text = ', '.join(ingredients) if ingredients else "No ingredients listed" | |
| if len(ingredients_text) > 150: | |
| ingredients_text = ingredients_text[:150] + "..." | |
| tags = recipe['tags'] | |
| tags_html = ' '.join([f'<span style="background: #e3f2fd; padding: 2px 6px; border-radius: 12px; font-size: 0.8em; margin: 2px;">{tag}</span>' for tag in tags]) if tags else "" | |
| time_text = f"{recipe['minutes']} min" if recipe['minutes'] > 0 else "Time not specified" | |
| recipe_html = f""" | |
| <div style="border: 1px solid #ddd; border-radius: 8px; padding: 15px; margin: 15px 0; background: linear-gradient(135deg, #f8f9fa, #ffffff);"> | |
| <h3 style="color: #1976d2; margin-bottom: 10px;">{i}. {recipe['name']}</h3> | |
| <div style="margin: 8px 0;"> | |
| <strong>β±οΈ {time_text}</strong> | | |
| <strong>π₯ {recipe['n_steps']} steps</strong> | | |
| <strong>β {recipe['avg_rating']:.1f}/5.0</strong> ({recipe['num_ratings']} ratings) | |
| </div> | |
| <div style="margin: 8px 0;"> | |
| <span style="background: #4caf50; color: white; padding: 2px 8px; border-radius: 12px; font-size: 0.8em; margin-right: 5px;"> | |
| Match: {recipe['similarity_score']:.1%} | |
| </span> | |
| <span style="background: #ff9800; color: white; padding: 2px 8px; border-radius: 12px; font-size: 0.8em;"> | |
| Score: {recipe['combined_score']:.1%} | |
| </span> | |
| </div> | |
| <div style="margin: 10px 0;"> | |
| {tags_html} | |
| </div> | |
| <div style="margin: 10px 0; color: #555;"> | |
| <strong>π₯ Ingredients:</strong><br> | |
| {ingredients_text} | |
| </div> | |
| </div> | |
| """ | |
| html += recipe_html | |
| return html | |
| # Initialize the search system | |
| print("π Initializing deployment-ready recipe search system...") | |
| try: | |
| search_system = DeployableRecipeSearch() | |
| except Exception as e: | |
| print(f"β Initialization failed: {e}") | |
| search_system = None | |
| def search_interface(query, num_results, min_rating): | |
| """Gradio interface function""" | |
| if search_system is None: | |
| return "<div style='color: red;'>β System initialization failed</div>" | |
| return search_system.search_recipes(query, int(num_results), float(min_rating)) | |
| # Create Gradio interface | |
| with gr.Blocks(title="Group 5 Pattern Recognition Project", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown(""" | |
| # π½οΈ Group 5 Pattern Recognition Project | |
| ### Advanced Recipe Recommendation using Semantic Search | |
| """) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| query_input = gr.Textbox( | |
| label="π Search for recipes", | |
| placeholder="e.g., 'chicken pasta', 'vegetarian salad', 'chocolate dessert'", | |
| lines=1 | |
| ) | |
| with gr.Row(): | |
| num_results = gr.Slider(1, 10, 5, step=1, label="Results") | |
| min_rating = gr.Slider(1.0, 5.0, 3.0, step=0.1, label="Min Rating") | |
| search_btn = gr.Button("Search Recipes", variant="primary") | |
| # Example buttons | |
| with gr.Row(): | |
| ex1 = gr.Button("π Chicken Pasta", size="sm") | |
| ex2 = gr.Button("π₯ Healthy Salad", size="sm") | |
| ex3 = gr.Button("π« Chocolate Dessert", size="sm") | |
| with gr.Column(scale=1): | |
| results_output = gr.HTML(""" | |
| <div style="text-align: center; padding: 40px; color: #666;"> | |
| <h3>π Ready to Search</h3> | |
| <p>Enter a search query and click "Search Recipes" to see results.</p> | |
| </div> | |
| """) | |
| # Event handlers | |
| search_btn.click(search_interface, [query_input, num_results, min_rating], results_output) | |
| query_input.submit(search_interface, [query_input, num_results, min_rating], results_output) | |
| # Example buttons | |
| ex1.click(lambda: "chicken pasta", outputs=query_input) | |
| ex2.click(lambda: "healthy salad", outputs=query_input) | |
| ex3.click(lambda: "chocolate dessert", outputs=query_input) | |
| if __name__ == "__main__": | |
| demo.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, # Standard port for Hugging Face Spaces | |
| share=False | |
| ) |