Spaces:
Sleeping
Sleeping
| # Smart Shopping Agent for Hugging Face Spaces | |
| # File: app.py | |
| import streamlit as st | |
| import pandas as pd | |
| import requests | |
| from bs4 import BeautifulSoup | |
| import json | |
| import re | |
| from typing import List, Dict, Any | |
| import time | |
| from dataclasses import dataclass | |
| import os | |
| import random | |
| from datetime import datetime | |
| # Configure Streamlit page | |
| st.set_page_config( | |
| page_title="Smart Shopping Agent", | |
| page_icon="π", | |
| layout="wide", | |
| initial_sidebar_state="expanded" | |
| ) | |
| # Product data structure | |
| class Product: | |
| name: str | |
| price: float | |
| rating: float | |
| specs: Dict[str, Any] | |
| source: str | |
| url: str = "" | |
| class ShoppingAgent: | |
| """Main shopping agent class that handles the complete workflow""" | |
| def __init__(self): | |
| self.search_history = [] | |
| def search_products(self, query: str) -> List[Dict]: | |
| """Search for products - using enhanced mock data for demo""" | |
| try: | |
| # Simulate realistic product search | |
| products = self._enhanced_mock_search(query) | |
| return products | |
| except Exception as e: | |
| st.error(f"Error searching products: {e}") | |
| return [] | |
| def _enhanced_mock_search(self, query: str) -> List[Dict]: | |
| """Enhanced mock search with realistic product data""" | |
| # Product categories and their typical features | |
| category_specs = { | |
| 'headphones': ['wireless', 'noise_cancelling', 'battery_life', 'driver_size'], | |
| 'laptop': ['processor', 'ram', 'storage', 'screen_size', 'graphics'], | |
| 'smartphone': ['camera_mp', 'battery_mah', 'storage_gb', 'screen_inches', 'processor'], | |
| 'smartwatch': ['battery_days', 'water_resistance', 'health_sensors', 'compatibility'], | |
| 'tablet': ['screen_size', 'storage', 'battery_hours', 'processor', 'weight'], | |
| 'speaker': ['power_watts', 'battery_hours', 'water_rating', 'connectivity'] | |
| } | |
| # Determine category based on query | |
| category = 'general' | |
| for cat in category_specs.keys(): | |
| if cat in query.lower(): | |
| category = cat | |
| break | |
| # Generate realistic products | |
| products = [] | |
| brand_pools = { | |
| 'headphones': ['Sony', 'Bose', 'Apple', 'Samsung', 'JBL', 'Audio-Technica', 'Sennheiser'], | |
| 'laptop': ['Apple', 'Dell', 'HP', 'Lenovo', 'ASUS', 'Acer', 'MSI'], | |
| 'smartphone': ['Apple', 'Samsung', 'Google', 'OnePlus', 'Xiaomi', 'Huawei'], | |
| 'smartwatch': ['Apple', 'Samsung', 'Fitbit', 'Garmin', 'Amazfit', 'Fossil'], | |
| 'tablet': ['Apple', 'Samsung', 'Microsoft', 'Amazon', 'Lenovo'], | |
| 'speaker': ['JBL', 'Bose', 'Sony', 'Ultimate Ears', 'Marshall', 'Harman Kardon'] | |
| } | |
| brands = brand_pools.get(category, ['BrandA', 'BrandB', 'BrandC', 'BrandD', 'BrandE']) | |
| stores = ['Amazon', 'Best Buy', 'Target', 'Walmart', 'Newegg', 'B&H Photo'] | |
| for i in range(8): # Generate 8 products | |
| brand = random.choice(brands) | |
| model_suffix = random.choice(['Pro', 'Max', 'Ultra', 'Plus', 'Air', 'Mini', 'SE', 'Standard']) | |
| # Generate realistic specs based on category | |
| specs = {'brand': brand} | |
| if category in category_specs: | |
| for spec in category_specs[category]: | |
| if spec == 'battery_life': | |
| specs[spec] = f"{random.randint(20, 40)} hours" | |
| elif spec == 'driver_size': | |
| specs[spec] = f"{random.randint(40, 50)}mm" | |
| elif spec == 'processor': | |
| processors = ['Intel i5', 'Intel i7', 'AMD Ryzen 5', 'AMD Ryzen 7', 'Apple M1', 'Apple M2'] | |
| specs[spec] = random.choice(processors) | |
| elif spec == 'ram': | |
| specs[spec] = f"{random.choice([8, 16, 32])}GB" | |
| elif spec == 'storage': | |
| specs[spec] = f"{random.choice([256, 512, 1000])}GB SSD" | |
| elif spec == 'camera_mp': | |
| specs[spec] = f"{random.randint(48, 108)}MP" | |
| else: | |
| specs[spec] = f"High Quality {spec.replace('_', ' ').title()}" | |
| # Add common specs | |
| specs.update({ | |
| 'warranty': f"{random.randint(1, 3)} year(s)", | |
| 'availability': random.choice(['In Stock', 'Limited Stock', '2-3 days shipping']), | |
| 'color_options': random.choice(['Black, White', 'Multiple Colors', 'Black, Silver, Gold']) | |
| }) | |
| # Generate realistic pricing based on brand and category | |
| base_prices = { | |
| 'headphones': (50, 400), | |
| 'laptop': (400, 2500), | |
| 'smartphone': (200, 1200), | |
| 'smartwatch': (100, 800), | |
| 'tablet': (150, 1000), | |
| 'speaker': (30, 300) | |
| } | |
| price_range = base_prices.get(category, (50, 500)) | |
| base_price = random.uniform(price_range[0], price_range[1]) | |
| # Premium brands cost more | |
| if brand in ['Apple', 'Sony', 'Bose']: | |
| base_price *= 1.3 | |
| product = { | |
| 'name': f"{brand} {query.title()} {model_suffix}", | |
| 'price': round(base_price, 2), | |
| 'rating': round(random.uniform(3.2, 4.9), 1), | |
| 'specs': specs, | |
| 'source': random.choice(stores), | |
| 'url': f"https://{random.choice(stores).lower().replace(' ', '')}.com/product/{query.replace(' ', '-')}-{i}", | |
| 'reviews_count': random.randint(50, 2000), | |
| 'shipping': random.choice(['Free shipping', '$5.99 shipping', 'Prime eligible']) | |
| } | |
| products.append(product) | |
| # Sort by a mix of rating and randomness for realistic results | |
| products.sort(key=lambda x: x['rating'] + random.uniform(-0.3, 0.3), reverse=True) | |
| return products | |
| def compare_products(self, products: List[Dict]) -> Dict: | |
| """Compare products and generate analysis""" | |
| if not products: | |
| return {} | |
| try: | |
| comparison = { | |
| 'total_products': len(products), | |
| 'search_timestamp': datetime.now().isoformat(), | |
| 'price_analysis': { | |
| 'min': min(p['price'] for p in products), | |
| 'max': max(p['price'] for p in products), | |
| 'average': round(sum(p['price'] for p in products) / len(products), 2), | |
| 'median': round(sorted([p['price'] for p in products])[len(products)//2], 2) | |
| }, | |
| 'rating_analysis': { | |
| 'min': min(p['rating'] for p in products), | |
| 'max': max(p['rating'] for p in products), | |
| 'average': round(sum(p['rating'] for p in products) / len(products), 1), | |
| 'median': round(sorted([p['rating'] for p in products])[len(products)//2], 1) | |
| }, | |
| 'store_distribution': {}, | |
| 'brand_distribution': {}, | |
| 'products': products | |
| } | |
| # Store distribution | |
| for product in products: | |
| store = product['source'] | |
| comparison['store_distribution'][store] = comparison['store_distribution'].get(store, 0) + 1 | |
| # Brand distribution | |
| for product in products: | |
| brand = product['specs'].get('brand', 'Unknown') | |
| comparison['brand_distribution'][brand] = comparison['brand_distribution'].get(brand, 0) + 1 | |
| return comparison | |
| except Exception as e: | |
| st.error(f"Error in product comparison: {e}") | |
| return {} | |
| def generate_recommendations(self, comparison: Dict, preferences: str = "") -> Dict: | |
| """Generate smart recommendations based on analysis""" | |
| if not comparison or not comparison['products']: | |
| return {} | |
| try: | |
| products = comparison['products'] | |
| # Enhanced scoring algorithm | |
| scored_products = [] | |
| for product in products: | |
| # Normalize scores (0-1 range) | |
| price_score = 1 - ((product['price'] - comparison['price_analysis']['min']) / | |
| (comparison['price_analysis']['max'] - comparison['price_analysis']['min'])) | |
| rating_score = ((product['rating'] - comparison['rating_analysis']['min']) / | |
| (comparison['rating_analysis']['max'] - comparison['rating_analysis']['min'])) | |
| # Reviews count influence (more reviews = more reliable) | |
| reviews_score = min(product.get('reviews_count', 0) / 1000, 1.0) # Cap at 1000 reviews | |
| # Preference-based scoring | |
| preference_score = 0 | |
| if preferences: | |
| pref_lower = preferences.lower() | |
| # Brand preference | |
| if product['specs'].get('brand', '').lower() in pref_lower: | |
| preference_score += 0.2 | |
| # Budget preference | |
| if 'budget' in pref_lower and product['price'] < comparison['price_analysis']['average']: | |
| preference_score += 0.15 | |
| # Quality preference | |
| if 'quality' in pref_lower or 'best' in pref_lower: | |
| preference_score += (rating_score * 0.1) | |
| # Composite score with weights | |
| composite_score = ( | |
| price_score * 0.30 + # 30% price importance | |
| rating_score * 0.40 + # 40% rating importance | |
| reviews_score * 0.15 + # 15% reviews reliability | |
| preference_score * 0.15 # 15% preference matching | |
| ) | |
| scored_products.append({ | |
| **product, | |
| 'composite_score': round(composite_score, 3), | |
| 'price_score': round(price_score, 3), | |
| 'rating_score': round(rating_score, 3), | |
| 'reviews_score': round(reviews_score, 3), | |
| 'preference_score': round(preference_score, 3) | |
| }) | |
| # Sort by composite score | |
| scored_products.sort(key=lambda x: x['composite_score'], reverse=True) | |
| # Find category winners | |
| best_price = min(products, key=lambda x: x['price']) | |
| highest_rated = max(products, key=lambda x: x['rating']) | |
| most_reviewed = max(products, key=lambda x: x.get('reviews_count', 0)) | |
| recommendations = { | |
| 'best_overall': scored_products[0], | |
| 'best_value': best_price, | |
| 'highest_rated': highest_rated, | |
| 'most_reviewed': most_reviewed, | |
| 'top_3': scored_products[:3], | |
| 'budget_options': [p for p in scored_products if p['price'] < comparison['price_analysis']['average']][:2], | |
| 'premium_options': [p for p in scored_products if p['price'] > comparison['price_analysis']['average']][:2] | |
| } | |
| return recommendations | |
| except Exception as e: | |
| st.error(f"Error generating recommendations: {e}") | |
| return {} | |
| # Initialize the shopping agent | |
| def get_shopping_agent(): | |
| return ShoppingAgent() | |
| # Main Streamlit UI | |
| def main(): | |
| st.title("π Smart Shopping Agent") | |
| st.markdown("### Your AI-powered shopping assistant that finds, compares, and recommends the best products!") | |
| # Initialize agent | |
| agent = get_shopping_agent() | |
| # Sidebar | |
| with st.sidebar: | |
| st.header("π§ Agent Info") | |
| st.markdown("**Platform:** Hugging Face Spaces") | |
| st.markdown("**Framework:** Custom Multi-Agent System") | |
| st.markdown("**UI:** Streamlit") | |
| st.markdown("**Status:** β Online") | |
| st.header("π― How it works") | |
| st.markdown(""" | |
| 1. **π Search**: Find products across multiple stores | |
| 2. **π Compare**: Analyze prices, ratings, and features | |
| 3. **π Recommend**: Get personalized suggestions | |
| 4. **π Insights**: Market analysis and trends | |
| """) | |
| st.header("π Features") | |
| st.markdown(""" | |
| - Multi-store price comparison | |
| - Smart recommendation scoring | |
| - Detailed product specifications | |
| - Market trend analysis | |
| - Export comparison reports | |
| """) | |
| # Main search interface | |
| st.header("π Product Search") | |
| col1, col2 = st.columns([3, 1]) | |
| with col1: | |
| product_query = st.text_input( | |
| "What product are you looking for?", | |
| placeholder="e.g., wireless headphones, gaming laptop, smartphone", | |
| key="product_search" | |
| ) | |
| with col2: | |
| st.markdown("**Popular Searches:**") | |
| popular_searches = [ | |
| "wireless headphones", | |
| "gaming laptop", | |
| "smartphone", | |
| "smartwatch", | |
| "bluetooth speaker" | |
| ] | |
| for search in popular_searches: | |
| if st.button(f"π {search}", key=f"popular_{search}"): | |
| st.session_state.product_search = search | |
| st.experimental_rerun() | |
| # Preferences section | |
| with st.expander("βοΈ Search Preferences (Optional)", expanded=False): | |
| col1, col2, col3 = st.columns(3) | |
| with col1: | |
| budget_range = st.selectbox( | |
| "Budget Range", | |
| ["Any Budget", "Under $100", "$100-$500", "$500-$1000", "Over $1000"] | |
| ) | |
| with col2: | |
| brand_preference = st.text_input( | |
| "Preferred Brands", | |
| placeholder="e.g., Apple, Sony, Samsung" | |
| ) | |
| with col3: | |
| priority = st.selectbox( | |
| "Priority", | |
| ["Best Overall", "Best Value", "Highest Quality", "Most Popular"] | |
| ) | |
| preferences = f"Budget: {budget_range}. Brands: {brand_preference}. Priority: {priority}." | |
| # Search button | |
| if st.button("π Find Best Products", type="primary", use_container_width=True): | |
| if product_query: | |
| # Store search in session state | |
| if 'search_results' not in st.session_state: | |
| st.session_state.search_results = {} | |
| # Progress indicator | |
| progress_container = st.container() | |
| with progress_container: | |
| progress_bar = st.progress(0) | |
| status_text = st.empty() | |
| # Step 1: Search | |
| status_text.text("π Searching for products...") | |
| progress_bar.progress(25) | |
| products = agent.search_products(product_query) | |
| # Step 2: Compare | |
| status_text.text("π Comparing features and prices...") | |
| progress_bar.progress(50) | |
| comparison = agent.compare_products(products) | |
| # Step 3: Generate recommendations | |
| status_text.text("π― Generating personalized recommendations...") | |
| progress_bar.progress(75) | |
| recommendations = agent.generate_recommendations(comparison, preferences) | |
| # Step 4: Complete | |
| status_text.text("β Analysis complete!") | |
| progress_bar.progress(100) | |
| time.sleep(1) | |
| # Clear progress | |
| progress_container.empty() | |
| if products and comparison and recommendations: | |
| # Store results | |
| st.session_state.search_results = { | |
| 'query': product_query, | |
| 'products': products, | |
| 'comparison': comparison, | |
| 'recommendations': recommendations, | |
| 'preferences': preferences | |
| } | |
| st.success(f"β Found {len(products)} products! Analysis complete.") | |
| else: | |
| st.warning("Please enter a product to search for.") | |
| # Display results if available | |
| if 'search_results' in st.session_state and st.session_state.search_results: | |
| display_results(st.session_state.search_results) | |
| def display_results(results): | |
| """Display comprehensive search results""" | |
| products = results['products'] | |
| comparison = results['comparison'] | |
| recommendations = results['recommendations'] | |
| # Top Recommendations Section | |
| st.header("π Top Recommendations") | |
| # Best Overall | |
| if 'best_overall' in recommendations: | |
| best = recommendations['best_overall'] | |
| st.subheader("π₯ Best Overall Choice") | |
| col1, col2, col3, col4 = st.columns(4) | |
| with col1: | |
| st.metric("Price", f"${best['price']}") | |
| with col2: | |
| st.metric("Rating", f"{best['rating']}/5") | |
| with col3: | |
| st.metric("Reviews", f"{best.get('reviews_count', 0):,}") | |
| with col4: | |
| st.metric("Score", f"{best['composite_score']:.2f}") | |
| st.markdown(f"**{best['name']}**") | |
| st.markdown(f"**Store:** {best['source']} | **Brand:** {best['specs'].get('brand', 'N/A')}") | |
| st.markdown(f"**Shipping:** {best.get('shipping', 'Standard shipping')}") | |
| # Category Winners | |
| st.subheader("ποΈ Category Winners") | |
| col1, col2, col3 = st.columns(3) | |
| with col1: | |
| st.markdown("**π° Best Value**") | |
| if 'best_value' in recommendations: | |
| best_value = recommendations['best_value'] | |
| st.markdown(f"{best_value['name']}") | |
| st.markdown(f"${best_value['price']} β’ {best_value['rating']}/5") | |
| st.markdown(f"Store: {best_value['source']}") | |
| with col2: | |
| st.markdown("**β Highest Rated**") | |
| if 'highest_rated' in recommendations: | |
| highest = recommendations['highest_rated'] | |
| st.markdown(f"{highest['name']}") | |
| st.markdown(f"${highest['price']} β’ {highest['rating']}/5") | |
| st.markdown(f"Store: {highest['source']}") | |
| with col3: | |
| st.markdown("**π₯ Most Reviewed**") | |
| if 'most_reviewed' in recommendations: | |
| most_rev = recommendations['most_reviewed'] | |
| st.markdown(f"{most_rev['name']}") | |
| st.markdown(f"${most_rev['price']} β’ {most_rev.get('reviews_count', 0):,} reviews") | |
| st.markdown(f"Store: {most_rev['source']}") | |
| # Market Analysis | |
| st.header("π Market Analysis") | |
| col1, col2, col3, col4 = st.columns(4) | |
| with col1: | |
| st.metric( | |
| "Price Range", | |
| f"${comparison['price_analysis']['min']:.0f} - ${comparison['price_analysis']['max']:.0f}" | |
| ) | |
| with col2: | |
| st.metric( | |
| "Average Price", | |
| f"${comparison['price_analysis']['average']:.0f}" | |
| ) | |
| with col3: | |
| st.metric( | |
| "Average Rating", | |
| f"{comparison['rating_analysis']['average']}/5" | |
| ) | |
| with col4: | |
| st.metric( | |
| "Total Products", | |
| f"{comparison['total_products']}" | |
| ) | |
| # Detailed Comparison Table | |
| st.header("π Product Comparison") | |
| # Create comparison DataFrame | |
| df_data = [] | |
| for product in products: | |
| df_data.append({ | |
| 'Product': product['name'], | |
| 'Price': f"${product['price']}", | |
| 'Rating': f"{product['rating']}/5", | |
| 'Reviews': f"{product.get('reviews_count', 0):,}", | |
| 'Brand': product['specs'].get('brand', 'N/A'), | |
| 'Store': product['source'], | |
| 'Shipping': product.get('shipping', 'Standard') | |
| }) | |
| df = pd.DataFrame(df_data) | |
| st.dataframe(df, use_container_width=True, hide_index=True) | |
| # Store and Brand Distribution | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.subheader("πͺ Store Distribution") | |
| store_dist = comparison.get('store_distribution', {}) | |
| if store_dist: | |
| st.bar_chart(store_dist) | |
| with col2: | |
| st.subheader("π·οΈ Brand Distribution") | |
| brand_dist = comparison.get('brand_distribution', {}) | |
| if brand_dist: | |
| st.bar_chart(brand_dist) | |
| # Detailed Product Cards | |
| st.header("π Detailed Product Information") | |
| # Show top 5 products in expandable cards | |
| top_products = recommendations.get('top_3', products[:3]) | |
| for i, product in enumerate(top_products): | |
| with st.expander(f"π {product['name']} - ${product['price']}", expanded=(i==0)): | |
| col1, col2 = st.columns([2, 1]) | |
| with col1: | |
| st.markdown(f"**Price:** ${product['price']}") | |
| st.markdown(f"**Rating:** {product['rating']}/5 ({product.get('reviews_count', 0):,} reviews)") | |
| st.markdown(f"**Store:** {product['source']}") | |
| st.markdown(f"**Shipping:** {product.get('shipping', 'Standard shipping')}") | |
| # Specifications | |
| st.markdown("**Specifications:**") | |
| specs_text = [] | |
| for key, value in product['specs'].items(): | |
| if key != 'brand': # Brand already shown above | |
| specs_text.append(f"β’ **{key.replace('_', ' ').title()}:** {value}") | |
| st.markdown('\n'.join(specs_text)) | |
| with col2: | |
| if 'composite_score' in product: | |
| st.markdown("**Scoring Breakdown:**") | |
| st.markdown(f"Overall Score: {product['composite_score']:.3f}") | |
| st.markdown(f"Price Score: {product.get('price_score', 0):.3f}") | |
| st.markdown(f"Rating Score: {product.get('rating_score', 0):.3f}") | |
| st.markdown(f"Reviews Score: {product.get('reviews_score', 0):.3f}") | |
| if st.button(f"π View Product", key=f"view_{i}"): | |
| st.markdown(f"[Open in {product['source']}]({product['url']})") | |
| # Export Section | |
| st.header("π₯ Export Results") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| # JSON Export | |
| if st.button("π Download JSON Report"): | |
| report_data = { | |
| 'search_query': results['query'], | |
| 'search_timestamp': datetime.now().isoformat(), | |
| 'preferences': results.get('preferences', ''), | |
| 'summary': { | |
| 'total_products': len(products), | |
| 'price_range': f"${comparison['price_analysis']['min']:.0f} - ${comparison['price_analysis']['max']:.0f}", | |
| 'average_rating': f"{comparison['rating_analysis']['average']:.1f}/5", | |
| 'best_overall': recommendations.get('best_overall', {}).get('name', 'N/A') | |
| }, | |
| 'products': products, | |
| 'detailed_analysis': comparison, | |
| 'recommendations': recommendations | |
| } | |
| st.download_button( | |
| label="πΎ Download Complete Report", | |
| data=json.dumps(report_data, indent=2), | |
| file_name=f"shopping_report_{results['query'].replace(' ', '_')}.json", | |
| mime="application/json" | |
| ) | |
| with col2: | |
| # CSV Export | |
| if st.button("π Download CSV"): | |
| csv_data = pd.DataFrame(df_data) | |
| st.download_button( | |
| label="π Download Comparison CSV", | |
| data=csv_data.to_csv(index=False), | |
| file_name=f"product_comparison_{results['query'].replace(' ', '_')}.csv", | |
| mime="text/csv" | |
| ) | |
| # Add footer with instructions | |
| def show_footer(): | |
| st.markdown("---") | |
| st.markdown(""" | |
| <div style='text-align: center; color: #666;'> | |
| <h4>π Deploy on Hugging Face Spaces</h4> | |
| <p>Save this as <code>app.py</code> and create a new Space with Streamlit SDK</p> | |
| <p>No additional setup required - runs directly on HF Spaces!</p> | |
| </div> | |
| """, unsafe_allow_html=True) | |
| if __name__ == "__main__": | |
| main() | |
| show_footer() | |
| # Requirements.txt content for Hugging Face Spaces: | |
| """ | |
| streamlit==1.28.0 | |
| pandas==2.0.3 | |
| requests==2.31.0 | |
| beautifulsoup4==4.12.2 | |
| """ |