Spaces:
Sleeping
Sleeping
| """ | |
| Recipe Processor Module | |
| This module provides functionality for processing recipes, including | |
| summarizing text and extracting ingredients. | |
| """ | |
| import json | |
| import logging | |
| import re | |
| import time | |
| import hashlib | |
| import numpy as np | |
| from collections import Counter | |
| # Set up logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s' | |
| ) | |
| class RecipeProcessor: | |
| """ | |
| Process recipes, summarizing text and extracting ingredients | |
| """ | |
| def __init__(self): | |
| """Initialize the recipe processor""" | |
| # Initialize summarizer | |
| self.summarizer = ExtractiveRecipeSummarizer() | |
| # Initialize ingredient processor | |
| self.ingredient_processor = IngredientProcessor() | |
| logging.info("Initialized RecipeProcessor") | |
| def _create_cache_key(self, text): | |
| """Create a deterministic cache key for text""" | |
| if not text: | |
| return None | |
| return hashlib.md5(text.encode('utf-8')).hexdigest() | |
| def process_recipe(self, recipe_data, summarize=True): | |
| """ | |
| Process a recipe, summarizing text and extracting ingredients | |
| Args: | |
| recipe_data: Recipe data (dict or JSON string) | |
| summarize: Whether to summarize description and instructions | |
| Returns: | |
| dict: Processed recipe data | |
| """ | |
| # Handle JSON string input | |
| if isinstance(recipe_data, str): | |
| try: | |
| recipe_data = json.loads(recipe_data) | |
| except json.JSONDecodeError: | |
| logging.error("Invalid JSON recipe data") | |
| return {"error": "Invalid JSON recipe data"} | |
| # Start with a copy of the input | |
| result = { | |
| "recipe": recipe_data.get("name", ""), | |
| "processed": {} | |
| } | |
| # Track timing | |
| start_time = time.time() | |
| # 1. Summarize description and instructions if requested | |
| if summarize: | |
| # Get description | |
| description = recipe_data.get("description", "") | |
| if description: | |
| desc_key = self._create_cache_key(description) | |
| result["processed"]["description"] = { | |
| "original": description, | |
| "summarized": self.summarizer.summarize( | |
| description, | |
| max_sentences=5, | |
| min_sentences=2, | |
| cache_key=desc_key | |
| ) | |
| } | |
| # Get instructions | |
| instructions = recipe_data.get("instructions", "") | |
| if instructions: | |
| instr_key = self._create_cache_key(instructions) | |
| result["processed"]["instructions"] = { | |
| "original": instructions, | |
| "summarized": self.summarizer.summarize( | |
| instructions, | |
| max_sentences=8, | |
| min_sentences=3, | |
| cache_key=instr_key | |
| ) | |
| } | |
| # 2. Extract and process ingredients | |
| result["processed"]["ingredients"] = self.ingredient_processor.extract_from_recipe(recipe_data) | |
| # Add timing information | |
| result["processing_time"] = f"{time.time() - start_time:.4f} seconds" | |
| return result | |
| class ExtractiveRecipeSummarizer: | |
| """ | |
| Fast extractive text summarization for recipe descriptions and instructions | |
| """ | |
| def __init__(self, max_cache_size=1000): | |
| """ | |
| Initialize the summarizer | |
| Args: | |
| max_cache_size: Maximum number of items to store in cache | |
| """ | |
| self.max_cache_size = max_cache_size | |
| self.summarization_cache = {} | |
| logging.info("Initialized ExtractiveRecipeSummarizer") | |
| def _calculate_sentence_scores(self, sentences, top_words=None): | |
| """ | |
| Calculate importance scores for sentences based on word frequency | |
| Args: | |
| sentences: List of sentences | |
| top_words: Optional list of important words to prioritize | |
| Returns: | |
| List of sentence scores | |
| """ | |
| # Combine all text and calculate word frequencies | |
| words = ' '.join(sentences).lower().split() | |
| word_frequencies = Counter(words) | |
| # Remove stopwords | |
| stopwords = set([ | |
| 'the', 'a', 'an', 'and', 'or', 'but', 'is', 'are', 'in', 'to', 'for', | |
| 'of', 'with', 'by', 'on', 'at', 'from', 'it', 'this', 'that', 'as', | |
| 'be', 'has', 'have', 'had', 'was', 'were', 'will', 'would', 'could', | |
| 'should', 'can', 'may', 'might', 'must', 'i', 'you', 'he', 'she', | |
| 'they', 'we', 'their', 'your', 'my', 'our' | |
| ]) | |
| for word in stopwords: | |
| if word in word_frequencies: | |
| del word_frequencies[word] | |
| # Get maximum frequency for normalization | |
| max_frequency = max(word_frequencies.values()) if word_frequencies else 1 | |
| # Normalize word frequencies | |
| normalized_frequencies = { | |
| word: freq / max_frequency | |
| for word, freq in word_frequencies.items() | |
| } | |
| # Prioritize top_words if provided | |
| if top_words: | |
| for word in top_words: | |
| if word.lower() in normalized_frequencies: | |
| normalized_frequencies[word.lower()] *= 1.5 | |
| # Score sentences based on word frequencies | |
| sentence_scores = [] | |
| for sentence in sentences: | |
| words = sentence.lower().split() | |
| score = sum(normalized_frequencies.get(word, 0) for word in words) / (len(words) + 1) | |
| # Bonus for sentences containing numerical values (often important in recipes) | |
| if any(char.isdigit() for char in sentence): | |
| score *= 1.2 | |
| # Bonus for sentences with key recipe words | |
| recipe_keywords = ['recipe', 'cook', 'prepare', 'heat', 'mix', 'stir', 'bake', | |
| 'simmer', 'boil', 'fry', 'chop', 'slice', 'serve', 'add'] | |
| if any(keyword in sentence.lower() for keyword in recipe_keywords): | |
| score *= 1.1 | |
| sentence_scores.append(score) | |
| return sentence_scores | |
| def summarize(self, text, max_sentences=5, min_sentences=2, cache_key=None): | |
| """ | |
| Perform extractive summarization by selecting the most important sentences | |
| Args: | |
| text: Text to summarize | |
| max_sentences: Maximum number of sentences to include | |
| min_sentences: Minimum number of sentences to include | |
| cache_key: Optional key for caching results | |
| Returns: | |
| str: Summarized text | |
| """ | |
| # Skip empty text | |
| if not text or len(text.strip()) < 30: # Skip very short text | |
| return text | |
| # Use cache if available and requested | |
| if cache_key and cache_key in self.summarization_cache: | |
| return self.summarization_cache[cache_key] | |
| # Track time for performance monitoring | |
| start_time = time.time() | |
| # Clean text and split into sentences | |
| text = re.sub(r'<.*?>', ' ', text) # Remove HTML tags | |
| text = re.sub(r'\s+', ' ', text).strip() # Normalize whitespace | |
| # Split into sentences | |
| sentences = re.split(r'(?<!\w\.\w.)(?<![A-Z][a-z]\.)(?<=\.|\?|\!)\s', text) | |
| sentences = [s.strip() for s in sentences if len(s.strip()) > 10] | |
| if not sentences: | |
| return text | |
| # Adjust max_sentences based on input length | |
| if len(sentences) < max_sentences: | |
| max_sentences = max(min_sentences, len(sentences)) | |
| # Calculate appropriate number of sentences based on input length | |
| word_count = len(text.split()) | |
| max_sentences = max(min_sentences, | |
| min(max_sentences, word_count // 50)) # Approximately 1 sentence per 50 words | |
| # Get sentence scores | |
| scores = self._calculate_sentence_scores(sentences) | |
| # Select top sentences while maintaining order | |
| if len(sentences) <= max_sentences: | |
| summary = ' '.join(sentences) | |
| else: | |
| # Get indices of top sentences by score | |
| top_indices = np.argsort(scores)[-max_sentences:] | |
| # Sort indices to maintain original order | |
| top_indices = sorted(top_indices) | |
| # Combine sentences | |
| summary = ' '.join([sentences[i] for i in top_indices]) | |
| # Log performance | |
| duration = time.time() - start_time | |
| logging.debug(f"Summarization completed in {duration:.4f} seconds") | |
| # Update cache if requested | |
| if cache_key: | |
| # Manage cache size | |
| if len(self.summarization_cache) >= self.max_cache_size: | |
| # Remove a random item if too full | |
| self.summarization_cache.pop(next(iter(self.summarization_cache))) | |
| self.summarization_cache[cache_key] = summary | |
| return summary | |
| class IngredientProcessor: | |
| """ | |
| Process recipe ingredients into structured format | |
| """ | |
| def __init__(self): | |
| """ | |
| Initialize the ingredient processor with common units and measures | |
| """ | |
| # Common units and quantities for ingredient parsing | |
| self.common_units = [ | |
| 'tablespoon', 'tablespoons', 'teaspoon', 'teaspoons', | |
| 'cup', 'cups', 'gram', 'grams', 'ounce', 'ounces', | |
| 'pound', 'pounds', 'ml', 'g', 'kg', 'oz', 'lb', | |
| 'pinch', 'handful', 'dash', 'slice', 'slices', | |
| 'tbsp', 'tsp', 'tbsps', 'tsps', 'can', 'cans', | |
| 'clove', 'cloves', 'bunch', 'bunches', 'stalk', 'stalks' | |
| ] | |
| logging.info("Initialized IngredientProcessor") | |
| def _clean_text(self, text): | |
| """ | |
| Clean ingredient text | |
| Args: | |
| text: Text to clean | |
| Returns: | |
| str: Cleaned text | |
| """ | |
| # Remove parentheses and their contents | |
| text = re.sub(r'\([^)]*\)', '', text) | |
| # Normalize whitespace | |
| text = re.sub(r'\s+', ' ', text).strip() | |
| return text | |
| def _extract_quantity(self, ingredient_text): | |
| """ | |
| Extract quantity from ingredient text | |
| Args: | |
| ingredient_text: Ingredient text | |
| Returns: | |
| tuple: (quantity, remaining_text) | |
| """ | |
| # Common fraction patterns | |
| fractions = { | |
| '¼': 0.25, '½': 0.5, '¾': 0.75, '⅓': 1 / 3, '⅔': 2 / 3, | |
| '⅕': 0.2, '⅖': 0.4, '⅗': 0.6, '⅘': 0.8, '⅙': 1 / 6, | |
| '⅚': 5 / 6, '⅛': 0.125, '⅜': 0.375, '⅝': 0.625, '⅞': 0.875 | |
| } | |
| # Replace unicode fractions with decimal values | |
| for symbol, value in fractions.items(): | |
| ingredient_text = ingredient_text.replace(symbol, f" {value} ") | |
| # Look for patterns like "1", "1.5", "1 1/2", etc. | |
| quantity_pattern = r'^(\d+\s+\d+/\d+|\d+/\d+|\d+\.\d+|\d+)' | |
| match = re.search(quantity_pattern, ingredient_text) | |
| if match: | |
| quantity_text = match.group(1).strip() | |
| # Convert fractions like "1/2" to decimal | |
| if '/' in quantity_text: | |
| # Handle mixed fractions like "1 1/2" | |
| if ' ' in quantity_text: | |
| whole, fraction = quantity_text.split() | |
| num, denom = fraction.split('/') | |
| quantity = float(whole) + float(num) / float(denom) | |
| else: | |
| num, denom = quantity_text.split('/') | |
| quantity = float(num) / float(denom) | |
| else: | |
| quantity = float(quantity_text) | |
| # Remove the quantity from the text | |
| remaining_text = ingredient_text[match.end():].strip() | |
| return quantity, remaining_text | |
| return None, ingredient_text | |
| def _extract_unit(self, ingredient_text): | |
| """ | |
| Extract unit from ingredient text | |
| Args: | |
| ingredient_text: Ingredient text | |
| Returns: | |
| tuple: (unit, remaining_text) | |
| """ | |
| words = ingredient_text.split() | |
| if not words: | |
| return None, ingredient_text | |
| # Check if the first word is a unit | |
| if words[0].lower().rstrip('s') in [unit.rstrip('s') for unit in self.common_units]: | |
| unit = words[0] | |
| remaining_text = ' '.join(words[1:]) | |
| return unit, remaining_text | |
| return None, ingredient_text | |
| def process_ingredient(self, ingredient_text): | |
| """ | |
| Process a single ingredient into structured format | |
| Args: | |
| ingredient_text: Text of the ingredient | |
| Returns: | |
| dict: Structured ingredient data | |
| """ | |
| # Clean the text | |
| cleaned_text = self._clean_text(ingredient_text) | |
| original_text = cleaned_text | |
| # Extract quantity | |
| quantity, cleaned_text = self._extract_quantity(cleaned_text) | |
| # Extract unit | |
| unit, cleaned_text = self._extract_unit(cleaned_text) | |
| # Remaining text is the ingredient name | |
| name = cleaned_text.strip() | |
| # Standardize unit format if found | |
| if unit: | |
| # Convert plurals to singular | |
| if unit.lower().endswith('s') and not unit.lower() in ['glass', 'swiss']: | |
| unit = unit[:-1] | |
| # Create structured ingredient | |
| structured_ingredient = { | |
| "name": name, | |
| "amount": quantity, | |
| "unit": unit if unit else "" | |
| } | |
| return structured_ingredient | |
| def extract_from_text(self, text): | |
| """ | |
| Extract ingredients from text | |
| Args: | |
| text: Text containing ingredients | |
| Returns: | |
| list: List of structured ingredients | |
| """ | |
| # Remove HTML tags | |
| text = re.sub(r'<.*?>', ' ', text) | |
| # Split into lines | |
| lines = [line.strip() for line in text.split('\n') if line.strip()] | |
| # Process each line as an ingredient | |
| ingredients = [] | |
| for line in lines: | |
| # Skip lines that don't look like ingredients | |
| if len(line) < 3 or ':' in line and len(line.split(':')[0]) < 10: | |
| continue | |
| # Remove numbering (e.g., "1. ") | |
| line = re.sub(r'^\d+[\.\)]?\s*', '', line) | |
| # Process the ingredient | |
| ingredient = self.process_ingredient(line) | |
| ingredients.append(ingredient) | |
| return ingredients | |
| def extract_from_recipe(self, recipe_dict): | |
| """ | |
| Extract ingredients from a recipe dictionary | |
| Args: | |
| recipe_dict: Recipe dictionary | |
| Returns: | |
| list: List of structured ingredients | |
| """ | |
| ingredients = [] | |
| # Check if we have a list of ingredients | |
| if 'ingredients' in recipe_dict: | |
| ingr_list = recipe_dict['ingredients'] | |
| # Check if it's a list or string | |
| if isinstance(ingr_list, list): | |
| for ingredient in ingr_list: | |
| # Check if it's already structured | |
| if isinstance(ingredient, dict) and 'original' in ingredient: | |
| ingredients.append(ingredient) | |
| else: | |
| # Process string ingredient | |
| processed = self.process_ingredient(str(ingredient)) | |
| ingredients.append(processed) | |
| elif isinstance(ingr_list, str): | |
| # Process ingredients text | |
| ingredients = self.extract_from_text(ingr_list) | |
| # Try to extract from instructions if no ingredients found | |
| elif 'instructions' in recipe_dict and not ingredients: | |
| instructions = recipe_dict['instructions'] | |
| # Try to find ingredient list patterns in instructions | |
| if isinstance(instructions, str): | |
| # Look for sections that might contain ingredients | |
| ingredient_section = re.search(r'ingredients:(.+?)(?:instructions|directions|method|steps):', | |
| instructions.lower(), re.DOTALL) | |
| if ingredient_section: | |
| ingredients = self.extract_from_text(ingredient_section.group(1)) | |
| return ingredients |