import csv from collections import Counter _cached_orders = None _cached_products = None def read_csv_data(): global _cached_orders if _cached_orders is not None: return _cached_orders orders = [] try: with open('Ecommerce_data.csv', 'r', encoding='utf-8-sig') as file: reader = csv.DictReader(file) for row in reader: orders.append({ "order_id": str(row.get('order_id', '')).strip(), "customer_name": str(row.get('customer_name', 'Customer')).strip(), "product_name": str(row.get('Buy', 'Product')).strip(), "mobile": str(row.get('Mobile', '')).strip(), "category": str(row.get('Category', 'Product')).strip(), "address": str(row.get('customer_address', 'Address')).strip(), "discount": str(row.get('discount_percentage', '0')).strip(), "status": "Delivered" }) _cached_orders = orders except Exception as e: return [] return orders def lookup_order(order_id): order_id_str = str(order_id).strip() for order in read_csv_data(): if str(order["order_id"]).strip() == order_id_str: return {"status": "success", "order": [order]} return {"status": "not_found", "message": f"No order found with ID {order_id}"} def last_orders(mobile_number): mobile_str = str(mobile_number).strip() matching_orders = [order for order in read_csv_data() if str(order["mobile"]).strip() == mobile_str] return {"status": "success", "orders": matching_orders[-10:]} if matching_orders else {"status": "not_found", "message": f"No orders found for mobile number {mobile_number}"} def handle_general_query(query): query_lower = query.lower() # Direct responses for common queries if any(word in query_lower for word in ['service', 'provide', 'offering']): return "We provide online shopping for electronics, clothing, furniture, books, toys, sports, beauty products and groceries with amazing discounts. We also offer fast delivery, easy returns, and secure payments." elif any(word in query_lower for word in ['payment', 'pay', 'method']): return "We accept credit cards, debit cards, UPI, net banking, and cash on delivery for your convenience." elif any(word in query_lower for word in ['delivery', 'shipping']): return "We provide fast delivery across India with tracking facility. Standard delivery takes 3-5 business days." elif any(word in query_lower for word in ['discount', 'offer']): return "We offer amazing discounts up to 30% on all categories. Check our website for current offers and deals." elif any(word in query_lower for word in ['return', 'refund']): return "We have easy return policy within 7 days of delivery. Refunds are processed within 5-7 working days." elif any(word in query_lower for word in ['website', 'site', 'link', 'contact', 'email']): return "Visit www.sparkmart.com or contact sparkmartai@gmail.com for support." elif any(word in query_lower for word in ['next offer', 'upcoming', 'future']): return "Visit www.sparkmart.com for upcoming offers or contact sparkmartai@gmail.com." else: return "I can help with information about payments, delivery, returns, discounts, products, or services. What would you like to know?" def read_product_data(): global _cached_products if _cached_products is not None: return _cached_products products = [] try: with open('Ecommerce_data.csv', 'r', encoding='utf-8-sig') as file: reader = csv.DictReader(file) for row in reader: try: discount_val = int(row.get('discount_percentage', 0)) except (ValueError, TypeError): discount_val = 0 products.append({ "product_name": str(row.get('Buy', 'Product')).strip(), "category": str(row.get('Category', 'General')).strip(), "discount": discount_val, "customer_name": str(row.get('customer_name', 'Customer')).strip(), "mobile": str(row.get('Mobile', '')).strip(), "gender": str(row.get('Gender', '')).strip() }) _cached_products = products except Exception as e: return [] return products def recommend_products(query_type="popular", category=None, mobile_number=None, limit=5): try: if query_type == "category" and category: products = read_product_data() category_products = [p for p in products if p['category'].lower() == category.lower()] unique_products = {} for product in category_products: name = product['product_name'] if name not in unique_products or product['discount'] > unique_products[name]['discount']: unique_products[name] = product recommendations = sorted(unique_products.values(), key=lambda x: x['discount'], reverse=True)[:limit] if recommendations: return {"status": "success", "type": "category", "category": category, "recommendations": recommendations} elif query_type == "personalized" and mobile_number: products = read_product_data() user_purchases = [p for p in products if p['mobile'] == str(mobile_number).strip()] if not user_purchases: product_counts = Counter(p['product_name'] for p in products) popular_items = [] for product_name, count in product_counts.most_common(limit): product_variants = [p for p in products if p['product_name'] == product_name] best_variant = max(product_variants, key=lambda x: x['discount']) popular_items.append({**best_variant, 'popularity_count': count}) return {"status": "success", "type": "personalized", "recommendations": popular_items} user_gender = user_purchases[0].get('gender', '') user_categories = [p['category'] for p in user_purchases] category_counts = Counter(user_categories) recommendations = [] for category, _ in category_counts.most_common(3): category_products = [p for p in products if p['category'].lower() == category.lower()] unique_products = {} for product in category_products: name = product['product_name'] if name not in unique_products or product['discount'] > unique_products[name]['discount']: unique_products[name] = product category_recs = sorted(unique_products.values(), key=lambda x: x['discount'], reverse=True)[:2] if user_gender and category.lower() == 'beauty': if user_gender.lower() == 'male': category_recs = [r for r in category_recs if any(item in r['product_name'].lower() for item in ['perfume', 'moisturizer'])] user_products = {p['product_name'] for p in user_purchases} new_recs = [r for r in category_recs if r['product_name'] not in user_products] recommendations.extend(new_recs) return {"status": "success", "type": "personalized", "recommendations": recommendations[:limit]} else: products = read_product_data() product_counts = Counter(p['product_name'] for p in products) popular_items = [] for product_name, count in product_counts.most_common(limit): product_variants = [p for p in products if p['product_name'] == product_name] best_variant = max(product_variants, key=lambda x: x['discount']) popular_items.append({**best_variant, 'popularity_count': count}) return {"status": "success", "type": "popular", "recommendations": popular_items} except Exception as e: return {"status": "error", "message": f"Recommendation error: {str(e)}"} def format_recommendations(recommendations_data): if recommendations_data["status"] != "success": return "Sorry, I couldn't generate recommendations right now. Please try again later." recs = recommendations_data["recommendations"] if not recs: return "No recommendations available at the moment. Please check our website for latest products." rec_type = recommendations_data["type"] if rec_type == "category": header = f"Here are top {recommendations_data['category']} products with great discounts:\n\n" elif rec_type == "personalized": header = "Based on your purchase history, here are some products you might like:\n\n" else: header = "Here are our most popular products with amazing discounts:\n\n" formatted_recs = [f"{rec['product_name']} ({rec['category']}) - {rec['discount']}% OFF" for rec in recs] return header + "\n".join(formatted_recs) + "\nWould you like more details about any of these products or recommendations from a specific category?"