Spaces:
Sleeping
Sleeping
| import csv | |
| from collections import Counter | |
| _cached_orders = None | |
| _cached_products = None | |
| def read_csv_data(): | |
| global _cached_orders | |
| if _cached_orders is not None: | |
| return _cached_orders | |
| orders = [] | |
| try: | |
| with open('Ecommerce_data.csv', 'r', encoding='utf-8-sig') as file: | |
| reader = csv.DictReader(file) | |
| for row in reader: | |
| orders.append({ | |
| "order_id": str(row.get('order_id', '')).strip(), | |
| "customer_name": str(row.get('customer_name', 'Customer')).strip(), | |
| "product_name": str(row.get('Buy', 'Product')).strip(), | |
| "mobile": str(row.get('Mobile', '')).strip(), | |
| "category": str(row.get('Category', 'Product')).strip(), | |
| "address": str(row.get('customer_address', 'Address')).strip(), | |
| "discount": str(row.get('discount_percentage', '0')).strip(), | |
| "status": "Delivered" | |
| }) | |
| _cached_orders = orders | |
| except Exception as e: | |
| return [] | |
| return orders | |
| def lookup_order(order_id): | |
| order_id_str = str(order_id).strip() | |
| for order in read_csv_data(): | |
| if str(order["order_id"]).strip() == order_id_str: | |
| return {"status": "success", "order": [order]} | |
| return {"status": "not_found", "message": f"No order found with ID {order_id}"} | |
| def last_orders(mobile_number): | |
| mobile_str = str(mobile_number).strip() | |
| matching_orders = [order for order in read_csv_data() if str(order["mobile"]).strip() == mobile_str] | |
| return {"status": "success", "orders": matching_orders[-10:]} if matching_orders else {"status": "not_found", "message": f"No orders found for mobile number {mobile_number}"} | |
| def handle_general_query(query): | |
| query_lower = query.lower() | |
| # Direct responses for common queries | |
| if any(word in query_lower for word in ['service', 'provide', 'offering']): | |
| return "We provide online shopping for electronics, clothing, furniture, books, toys, sports, beauty products and groceries with amazing discounts. We also offer fast delivery, easy returns, and secure payments." | |
| elif any(word in query_lower for word in ['payment', 'pay', 'method']): | |
| return "We accept credit cards, debit cards, UPI, net banking, and cash on delivery for your convenience." | |
| elif any(word in query_lower for word in ['delivery', 'shipping']): | |
| return "We provide fast delivery across India with tracking facility. Standard delivery takes 3-5 business days." | |
| elif any(word in query_lower for word in ['discount', 'offer']): | |
| return "We offer amazing discounts up to 30% on all categories. Check our website for current offers and deals." | |
| elif any(word in query_lower for word in ['return', 'refund']): | |
| return "We have easy return policy within 7 days of delivery. Refunds are processed within 5-7 working days." | |
| elif any(word in query_lower for word in ['website', 'site', 'link', 'contact', 'email']): | |
| return "Visit www.sparkmart.com or contact sparkmartai@gmail.com for support." | |
| elif any(word in query_lower for word in ['next offer', 'upcoming', 'future']): | |
| return "Visit www.sparkmart.com for upcoming offers or contact sparkmartai@gmail.com." | |
| else: | |
| return "I can help with information about payments, delivery, returns, discounts, products, or services. What would you like to know?" | |
| def read_product_data(): | |
| global _cached_products | |
| if _cached_products is not None: | |
| return _cached_products | |
| products = [] | |
| try: | |
| with open('Ecommerce_data.csv', 'r', encoding='utf-8-sig') as file: | |
| reader = csv.DictReader(file) | |
| for row in reader: | |
| try: | |
| discount_val = int(row.get('discount_percentage', 0)) | |
| except (ValueError, TypeError): | |
| discount_val = 0 | |
| products.append({ | |
| "product_name": str(row.get('Buy', 'Product')).strip(), | |
| "category": str(row.get('Category', 'General')).strip(), | |
| "discount": discount_val, | |
| "customer_name": str(row.get('customer_name', 'Customer')).strip(), | |
| "mobile": str(row.get('Mobile', '')).strip(), | |
| "gender": str(row.get('Gender', '')).strip() | |
| }) | |
| _cached_products = products | |
| except Exception as e: | |
| return [] | |
| return products | |
| def recommend_products(query_type="popular", category=None, mobile_number=None, limit=5): | |
| try: | |
| if query_type == "category" and category: | |
| products = read_product_data() | |
| category_products = [p for p in products if p['category'].lower() == category.lower()] | |
| unique_products = {} | |
| for product in category_products: | |
| name = product['product_name'] | |
| if name not in unique_products or product['discount'] > unique_products[name]['discount']: | |
| unique_products[name] = product | |
| recommendations = sorted(unique_products.values(), key=lambda x: x['discount'], reverse=True)[:limit] | |
| if recommendations: | |
| return {"status": "success", "type": "category", "category": category, "recommendations": recommendations} | |
| elif query_type == "personalized" and mobile_number: | |
| products = read_product_data() | |
| user_purchases = [p for p in products if p['mobile'] == str(mobile_number).strip()] | |
| if not user_purchases: | |
| product_counts = Counter(p['product_name'] for p in products) | |
| popular_items = [] | |
| for product_name, count in product_counts.most_common(limit): | |
| product_variants = [p for p in products if p['product_name'] == product_name] | |
| best_variant = max(product_variants, key=lambda x: x['discount']) | |
| popular_items.append({**best_variant, 'popularity_count': count}) | |
| return {"status": "success", "type": "personalized", "recommendations": popular_items} | |
| user_gender = user_purchases[0].get('gender', '') | |
| user_categories = [p['category'] for p in user_purchases] | |
| category_counts = Counter(user_categories) | |
| recommendations = [] | |
| for category, _ in category_counts.most_common(3): | |
| category_products = [p for p in products if p['category'].lower() == category.lower()] | |
| unique_products = {} | |
| for product in category_products: | |
| name = product['product_name'] | |
| if name not in unique_products or product['discount'] > unique_products[name]['discount']: | |
| unique_products[name] = product | |
| category_recs = sorted(unique_products.values(), key=lambda x: x['discount'], reverse=True)[:2] | |
| if user_gender and category.lower() == 'beauty': | |
| if user_gender.lower() == 'male': | |
| category_recs = [r for r in category_recs if any(item in r['product_name'].lower() for item in ['perfume', 'moisturizer'])] | |
| user_products = {p['product_name'] for p in user_purchases} | |
| new_recs = [r for r in category_recs if r['product_name'] not in user_products] | |
| recommendations.extend(new_recs) | |
| return {"status": "success", "type": "personalized", "recommendations": recommendations[:limit]} | |
| else: | |
| products = read_product_data() | |
| product_counts = Counter(p['product_name'] for p in products) | |
| popular_items = [] | |
| for product_name, count in product_counts.most_common(limit): | |
| product_variants = [p for p in products if p['product_name'] == product_name] | |
| best_variant = max(product_variants, key=lambda x: x['discount']) | |
| popular_items.append({**best_variant, 'popularity_count': count}) | |
| return {"status": "success", "type": "popular", "recommendations": popular_items} | |
| except Exception as e: | |
| return {"status": "error", "message": f"Recommendation error: {str(e)}"} | |
| def format_recommendations(recommendations_data): | |
| if recommendations_data["status"] != "success": | |
| return "Sorry, I couldn't generate recommendations right now. Please try again later." | |
| recs = recommendations_data["recommendations"] | |
| if not recs: | |
| return "No recommendations available at the moment. Please check our website for latest products." | |
| rec_type = recommendations_data["type"] | |
| if rec_type == "category": | |
| header = f"Here are top {recommendations_data['category']} products with great discounts:\n\n" | |
| elif rec_type == "personalized": | |
| header = "Based on your purchase history, here are some products you might like:\n\n" | |
| else: | |
| header = "Here are our most popular products with amazing discounts:\n\n" | |
| formatted_recs = [f"{rec['product_name']} ({rec['category']}) - {rec['discount']}% OFF" for rec in recs] | |
| return header + "\n".join(formatted_recs) + "\nWould you like more details about any of these products or recommendations from a specific category?" |