zarashahid's picture
Create app.py
79bf10e verified
# Smart Shopping Agent for Hugging Face Spaces
# File: app.py
import streamlit as st
import pandas as pd
import requests
from bs4 import BeautifulSoup
import json
import re
from typing import List, Dict, Any
import time
from dataclasses import dataclass
import os
import random
from datetime import datetime
# Configure Streamlit page
st.set_page_config(
page_title="Smart Shopping Agent",
page_icon="πŸ›’",
layout="wide",
initial_sidebar_state="expanded"
)
# Product data structure
@dataclass
class Product:
name: str
price: float
rating: float
specs: Dict[str, Any]
source: str
url: str = ""
class ShoppingAgent:
"""Main shopping agent class that handles the complete workflow"""
def __init__(self):
self.search_history = []
def search_products(self, query: str) -> List[Dict]:
"""Search for products - using enhanced mock data for demo"""
try:
# Simulate realistic product search
products = self._enhanced_mock_search(query)
return products
except Exception as e:
st.error(f"Error searching products: {e}")
return []
def _enhanced_mock_search(self, query: str) -> List[Dict]:
"""Enhanced mock search with realistic product data"""
# Product categories and their typical features
category_specs = {
'headphones': ['wireless', 'noise_cancelling', 'battery_life', 'driver_size'],
'laptop': ['processor', 'ram', 'storage', 'screen_size', 'graphics'],
'smartphone': ['camera_mp', 'battery_mah', 'storage_gb', 'screen_inches', 'processor'],
'smartwatch': ['battery_days', 'water_resistance', 'health_sensors', 'compatibility'],
'tablet': ['screen_size', 'storage', 'battery_hours', 'processor', 'weight'],
'speaker': ['power_watts', 'battery_hours', 'water_rating', 'connectivity']
}
# Determine category based on query
category = 'general'
for cat in category_specs.keys():
if cat in query.lower():
category = cat
break
# Generate realistic products
products = []
brand_pools = {
'headphones': ['Sony', 'Bose', 'Apple', 'Samsung', 'JBL', 'Audio-Technica', 'Sennheiser'],
'laptop': ['Apple', 'Dell', 'HP', 'Lenovo', 'ASUS', 'Acer', 'MSI'],
'smartphone': ['Apple', 'Samsung', 'Google', 'OnePlus', 'Xiaomi', 'Huawei'],
'smartwatch': ['Apple', 'Samsung', 'Fitbit', 'Garmin', 'Amazfit', 'Fossil'],
'tablet': ['Apple', 'Samsung', 'Microsoft', 'Amazon', 'Lenovo'],
'speaker': ['JBL', 'Bose', 'Sony', 'Ultimate Ears', 'Marshall', 'Harman Kardon']
}
brands = brand_pools.get(category, ['BrandA', 'BrandB', 'BrandC', 'BrandD', 'BrandE'])
stores = ['Amazon', 'Best Buy', 'Target', 'Walmart', 'Newegg', 'B&H Photo']
for i in range(8): # Generate 8 products
brand = random.choice(brands)
model_suffix = random.choice(['Pro', 'Max', 'Ultra', 'Plus', 'Air', 'Mini', 'SE', 'Standard'])
# Generate realistic specs based on category
specs = {'brand': brand}
if category in category_specs:
for spec in category_specs[category]:
if spec == 'battery_life':
specs[spec] = f"{random.randint(20, 40)} hours"
elif spec == 'driver_size':
specs[spec] = f"{random.randint(40, 50)}mm"
elif spec == 'processor':
processors = ['Intel i5', 'Intel i7', 'AMD Ryzen 5', 'AMD Ryzen 7', 'Apple M1', 'Apple M2']
specs[spec] = random.choice(processors)
elif spec == 'ram':
specs[spec] = f"{random.choice([8, 16, 32])}GB"
elif spec == 'storage':
specs[spec] = f"{random.choice([256, 512, 1000])}GB SSD"
elif spec == 'camera_mp':
specs[spec] = f"{random.randint(48, 108)}MP"
else:
specs[spec] = f"High Quality {spec.replace('_', ' ').title()}"
# Add common specs
specs.update({
'warranty': f"{random.randint(1, 3)} year(s)",
'availability': random.choice(['In Stock', 'Limited Stock', '2-3 days shipping']),
'color_options': random.choice(['Black, White', 'Multiple Colors', 'Black, Silver, Gold'])
})
# Generate realistic pricing based on brand and category
base_prices = {
'headphones': (50, 400),
'laptop': (400, 2500),
'smartphone': (200, 1200),
'smartwatch': (100, 800),
'tablet': (150, 1000),
'speaker': (30, 300)
}
price_range = base_prices.get(category, (50, 500))
base_price = random.uniform(price_range[0], price_range[1])
# Premium brands cost more
if brand in ['Apple', 'Sony', 'Bose']:
base_price *= 1.3
product = {
'name': f"{brand} {query.title()} {model_suffix}",
'price': round(base_price, 2),
'rating': round(random.uniform(3.2, 4.9), 1),
'specs': specs,
'source': random.choice(stores),
'url': f"https://{random.choice(stores).lower().replace(' ', '')}.com/product/{query.replace(' ', '-')}-{i}",
'reviews_count': random.randint(50, 2000),
'shipping': random.choice(['Free shipping', '$5.99 shipping', 'Prime eligible'])
}
products.append(product)
# Sort by a mix of rating and randomness for realistic results
products.sort(key=lambda x: x['rating'] + random.uniform(-0.3, 0.3), reverse=True)
return products
def compare_products(self, products: List[Dict]) -> Dict:
"""Compare products and generate analysis"""
if not products:
return {}
try:
comparison = {
'total_products': len(products),
'search_timestamp': datetime.now().isoformat(),
'price_analysis': {
'min': min(p['price'] for p in products),
'max': max(p['price'] for p in products),
'average': round(sum(p['price'] for p in products) / len(products), 2),
'median': round(sorted([p['price'] for p in products])[len(products)//2], 2)
},
'rating_analysis': {
'min': min(p['rating'] for p in products),
'max': max(p['rating'] for p in products),
'average': round(sum(p['rating'] for p in products) / len(products), 1),
'median': round(sorted([p['rating'] for p in products])[len(products)//2], 1)
},
'store_distribution': {},
'brand_distribution': {},
'products': products
}
# Store distribution
for product in products:
store = product['source']
comparison['store_distribution'][store] = comparison['store_distribution'].get(store, 0) + 1
# Brand distribution
for product in products:
brand = product['specs'].get('brand', 'Unknown')
comparison['brand_distribution'][brand] = comparison['brand_distribution'].get(brand, 0) + 1
return comparison
except Exception as e:
st.error(f"Error in product comparison: {e}")
return {}
def generate_recommendations(self, comparison: Dict, preferences: str = "") -> Dict:
"""Generate smart recommendations based on analysis"""
if not comparison or not comparison['products']:
return {}
try:
products = comparison['products']
# Enhanced scoring algorithm
scored_products = []
for product in products:
# Normalize scores (0-1 range)
price_score = 1 - ((product['price'] - comparison['price_analysis']['min']) /
(comparison['price_analysis']['max'] - comparison['price_analysis']['min']))
rating_score = ((product['rating'] - comparison['rating_analysis']['min']) /
(comparison['rating_analysis']['max'] - comparison['rating_analysis']['min']))
# Reviews count influence (more reviews = more reliable)
reviews_score = min(product.get('reviews_count', 0) / 1000, 1.0) # Cap at 1000 reviews
# Preference-based scoring
preference_score = 0
if preferences:
pref_lower = preferences.lower()
# Brand preference
if product['specs'].get('brand', '').lower() in pref_lower:
preference_score += 0.2
# Budget preference
if 'budget' in pref_lower and product['price'] < comparison['price_analysis']['average']:
preference_score += 0.15
# Quality preference
if 'quality' in pref_lower or 'best' in pref_lower:
preference_score += (rating_score * 0.1)
# Composite score with weights
composite_score = (
price_score * 0.30 + # 30% price importance
rating_score * 0.40 + # 40% rating importance
reviews_score * 0.15 + # 15% reviews reliability
preference_score * 0.15 # 15% preference matching
)
scored_products.append({
**product,
'composite_score': round(composite_score, 3),
'price_score': round(price_score, 3),
'rating_score': round(rating_score, 3),
'reviews_score': round(reviews_score, 3),
'preference_score': round(preference_score, 3)
})
# Sort by composite score
scored_products.sort(key=lambda x: x['composite_score'], reverse=True)
# Find category winners
best_price = min(products, key=lambda x: x['price'])
highest_rated = max(products, key=lambda x: x['rating'])
most_reviewed = max(products, key=lambda x: x.get('reviews_count', 0))
recommendations = {
'best_overall': scored_products[0],
'best_value': best_price,
'highest_rated': highest_rated,
'most_reviewed': most_reviewed,
'top_3': scored_products[:3],
'budget_options': [p for p in scored_products if p['price'] < comparison['price_analysis']['average']][:2],
'premium_options': [p for p in scored_products if p['price'] > comparison['price_analysis']['average']][:2]
}
return recommendations
except Exception as e:
st.error(f"Error generating recommendations: {e}")
return {}
# Initialize the shopping agent
@st.cache_resource
def get_shopping_agent():
return ShoppingAgent()
# Main Streamlit UI
def main():
st.title("πŸ›’ Smart Shopping Agent")
st.markdown("### Your AI-powered shopping assistant that finds, compares, and recommends the best products!")
# Initialize agent
agent = get_shopping_agent()
# Sidebar
with st.sidebar:
st.header("πŸ”§ Agent Info")
st.markdown("**Platform:** Hugging Face Spaces")
st.markdown("**Framework:** Custom Multi-Agent System")
st.markdown("**UI:** Streamlit")
st.markdown("**Status:** βœ… Online")
st.header("🎯 How it works")
st.markdown("""
1. **πŸ” Search**: Find products across multiple stores
2. **πŸ“Š Compare**: Analyze prices, ratings, and features
3. **πŸ† Recommend**: Get personalized suggestions
4. **πŸ“ˆ Insights**: Market analysis and trends
""")
st.header("🌟 Features")
st.markdown("""
- Multi-store price comparison
- Smart recommendation scoring
- Detailed product specifications
- Market trend analysis
- Export comparison reports
""")
# Main search interface
st.header("πŸ” Product Search")
col1, col2 = st.columns([3, 1])
with col1:
product_query = st.text_input(
"What product are you looking for?",
placeholder="e.g., wireless headphones, gaming laptop, smartphone",
key="product_search"
)
with col2:
st.markdown("**Popular Searches:**")
popular_searches = [
"wireless headphones",
"gaming laptop",
"smartphone",
"smartwatch",
"bluetooth speaker"
]
for search in popular_searches:
if st.button(f"πŸ” {search}", key=f"popular_{search}"):
st.session_state.product_search = search
st.experimental_rerun()
# Preferences section
with st.expander("βš™οΈ Search Preferences (Optional)", expanded=False):
col1, col2, col3 = st.columns(3)
with col1:
budget_range = st.selectbox(
"Budget Range",
["Any Budget", "Under $100", "$100-$500", "$500-$1000", "Over $1000"]
)
with col2:
brand_preference = st.text_input(
"Preferred Brands",
placeholder="e.g., Apple, Sony, Samsung"
)
with col3:
priority = st.selectbox(
"Priority",
["Best Overall", "Best Value", "Highest Quality", "Most Popular"]
)
preferences = f"Budget: {budget_range}. Brands: {brand_preference}. Priority: {priority}."
# Search button
if st.button("πŸš€ Find Best Products", type="primary", use_container_width=True):
if product_query:
# Store search in session state
if 'search_results' not in st.session_state:
st.session_state.search_results = {}
# Progress indicator
progress_container = st.container()
with progress_container:
progress_bar = st.progress(0)
status_text = st.empty()
# Step 1: Search
status_text.text("πŸ” Searching for products...")
progress_bar.progress(25)
products = agent.search_products(product_query)
# Step 2: Compare
status_text.text("πŸ“Š Comparing features and prices...")
progress_bar.progress(50)
comparison = agent.compare_products(products)
# Step 3: Generate recommendations
status_text.text("🎯 Generating personalized recommendations...")
progress_bar.progress(75)
recommendations = agent.generate_recommendations(comparison, preferences)
# Step 4: Complete
status_text.text("βœ… Analysis complete!")
progress_bar.progress(100)
time.sleep(1)
# Clear progress
progress_container.empty()
if products and comparison and recommendations:
# Store results
st.session_state.search_results = {
'query': product_query,
'products': products,
'comparison': comparison,
'recommendations': recommendations,
'preferences': preferences
}
st.success(f"βœ… Found {len(products)} products! Analysis complete.")
else:
st.warning("Please enter a product to search for.")
# Display results if available
if 'search_results' in st.session_state and st.session_state.search_results:
display_results(st.session_state.search_results)
def display_results(results):
"""Display comprehensive search results"""
products = results['products']
comparison = results['comparison']
recommendations = results['recommendations']
# Top Recommendations Section
st.header("πŸ† Top Recommendations")
# Best Overall
if 'best_overall' in recommendations:
best = recommendations['best_overall']
st.subheader("πŸ₯‡ Best Overall Choice")
col1, col2, col3, col4 = st.columns(4)
with col1:
st.metric("Price", f"${best['price']}")
with col2:
st.metric("Rating", f"{best['rating']}/5")
with col3:
st.metric("Reviews", f"{best.get('reviews_count', 0):,}")
with col4:
st.metric("Score", f"{best['composite_score']:.2f}")
st.markdown(f"**{best['name']}**")
st.markdown(f"**Store:** {best['source']} | **Brand:** {best['specs'].get('brand', 'N/A')}")
st.markdown(f"**Shipping:** {best.get('shipping', 'Standard shipping')}")
# Category Winners
st.subheader("πŸŽ–οΈ Category Winners")
col1, col2, col3 = st.columns(3)
with col1:
st.markdown("**πŸ’° Best Value**")
if 'best_value' in recommendations:
best_value = recommendations['best_value']
st.markdown(f"{best_value['name']}")
st.markdown(f"${best_value['price']} β€’ {best_value['rating']}/5")
st.markdown(f"Store: {best_value['source']}")
with col2:
st.markdown("**⭐ Highest Rated**")
if 'highest_rated' in recommendations:
highest = recommendations['highest_rated']
st.markdown(f"{highest['name']}")
st.markdown(f"${highest['price']} β€’ {highest['rating']}/5")
st.markdown(f"Store: {highest['source']}")
with col3:
st.markdown("**πŸ‘₯ Most Reviewed**")
if 'most_reviewed' in recommendations:
most_rev = recommendations['most_reviewed']
st.markdown(f"{most_rev['name']}")
st.markdown(f"${most_rev['price']} β€’ {most_rev.get('reviews_count', 0):,} reviews")
st.markdown(f"Store: {most_rev['source']}")
# Market Analysis
st.header("πŸ“ˆ Market Analysis")
col1, col2, col3, col4 = st.columns(4)
with col1:
st.metric(
"Price Range",
f"${comparison['price_analysis']['min']:.0f} - ${comparison['price_analysis']['max']:.0f}"
)
with col2:
st.metric(
"Average Price",
f"${comparison['price_analysis']['average']:.0f}"
)
with col3:
st.metric(
"Average Rating",
f"{comparison['rating_analysis']['average']}/5"
)
with col4:
st.metric(
"Total Products",
f"{comparison['total_products']}"
)
# Detailed Comparison Table
st.header("πŸ“Š Product Comparison")
# Create comparison DataFrame
df_data = []
for product in products:
df_data.append({
'Product': product['name'],
'Price': f"${product['price']}",
'Rating': f"{product['rating']}/5",
'Reviews': f"{product.get('reviews_count', 0):,}",
'Brand': product['specs'].get('brand', 'N/A'),
'Store': product['source'],
'Shipping': product.get('shipping', 'Standard')
})
df = pd.DataFrame(df_data)
st.dataframe(df, use_container_width=True, hide_index=True)
# Store and Brand Distribution
col1, col2 = st.columns(2)
with col1:
st.subheader("πŸͺ Store Distribution")
store_dist = comparison.get('store_distribution', {})
if store_dist:
st.bar_chart(store_dist)
with col2:
st.subheader("🏷️ Brand Distribution")
brand_dist = comparison.get('brand_distribution', {})
if brand_dist:
st.bar_chart(brand_dist)
# Detailed Product Cards
st.header("πŸ“‹ Detailed Product Information")
# Show top 5 products in expandable cards
top_products = recommendations.get('top_3', products[:3])
for i, product in enumerate(top_products):
with st.expander(f"πŸ” {product['name']} - ${product['price']}", expanded=(i==0)):
col1, col2 = st.columns([2, 1])
with col1:
st.markdown(f"**Price:** ${product['price']}")
st.markdown(f"**Rating:** {product['rating']}/5 ({product.get('reviews_count', 0):,} reviews)")
st.markdown(f"**Store:** {product['source']}")
st.markdown(f"**Shipping:** {product.get('shipping', 'Standard shipping')}")
# Specifications
st.markdown("**Specifications:**")
specs_text = []
for key, value in product['specs'].items():
if key != 'brand': # Brand already shown above
specs_text.append(f"β€’ **{key.replace('_', ' ').title()}:** {value}")
st.markdown('\n'.join(specs_text))
with col2:
if 'composite_score' in product:
st.markdown("**Scoring Breakdown:**")
st.markdown(f"Overall Score: {product['composite_score']:.3f}")
st.markdown(f"Price Score: {product.get('price_score', 0):.3f}")
st.markdown(f"Rating Score: {product.get('rating_score', 0):.3f}")
st.markdown(f"Reviews Score: {product.get('reviews_score', 0):.3f}")
if st.button(f"πŸ›’ View Product", key=f"view_{i}"):
st.markdown(f"[Open in {product['source']}]({product['url']})")
# Export Section
st.header("πŸ“₯ Export Results")
col1, col2 = st.columns(2)
with col1:
# JSON Export
if st.button("πŸ“„ Download JSON Report"):
report_data = {
'search_query': results['query'],
'search_timestamp': datetime.now().isoformat(),
'preferences': results.get('preferences', ''),
'summary': {
'total_products': len(products),
'price_range': f"${comparison['price_analysis']['min']:.0f} - ${comparison['price_analysis']['max']:.0f}",
'average_rating': f"{comparison['rating_analysis']['average']:.1f}/5",
'best_overall': recommendations.get('best_overall', {}).get('name', 'N/A')
},
'products': products,
'detailed_analysis': comparison,
'recommendations': recommendations
}
st.download_button(
label="πŸ’Ύ Download Complete Report",
data=json.dumps(report_data, indent=2),
file_name=f"shopping_report_{results['query'].replace(' ', '_')}.json",
mime="application/json"
)
with col2:
# CSV Export
if st.button("πŸ“Š Download CSV"):
csv_data = pd.DataFrame(df_data)
st.download_button(
label="πŸ“ˆ Download Comparison CSV",
data=csv_data.to_csv(index=False),
file_name=f"product_comparison_{results['query'].replace(' ', '_')}.csv",
mime="text/csv"
)
# Add footer with instructions
def show_footer():
st.markdown("---")
st.markdown("""
<div style='text-align: center; color: #666;'>
<h4>πŸš€ Deploy on Hugging Face Spaces</h4>
<p>Save this as <code>app.py</code> and create a new Space with Streamlit SDK</p>
<p>No additional setup required - runs directly on HF Spaces!</p>
</div>
""", unsafe_allow_html=True)
if __name__ == "__main__":
main()
show_footer()
# Requirements.txt content for Hugging Face Spaces:
"""
streamlit==1.28.0
pandas==2.0.3
requests==2.31.0
beautifulsoup4==4.12.2
"""