Spaces:
Runtime error
Runtime error
| import os | |
| import re | |
| import ast | |
| import json | |
| import requests | |
| import pandas as pd | |
| import sqlite3 | |
| import logging | |
| from flask import ( | |
| Flask, | |
| request, | |
| jsonify, | |
| render_template, | |
| redirect, | |
| url_for, | |
| session | |
| ) | |
| from flask_session import Session | |
| from dotenv import load_dotenv | |
| # Load environment variables from a .env file | |
| load_dotenv() | |
| # Configure Logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| CSV_PATH = "All_Categories.csv" # Path to your large CSV | |
| DB_PATH = "products.db" # SQLite database file | |
| TABLE_NAME = "products" | |
| # Securely load your Gemini API key from environment variables | |
| GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") # Ensure you set this in your .env file | |
| if not GEMINI_API_KEY: | |
| logger.error("Gemini API key not found. Please set GEMINI_API_KEY in your .env file.") | |
| raise ValueError("Gemini API key not found. Please set GEMINI_API_KEY in your .env file.") | |
| # Replace with the correct model name your account has access to | |
| GEMINI_MODEL_NAME = "gemini-1.5-flash" # If invalid, try "gemini-1.5-pro" | |
| GEMINI_ENDPOINT = f"https://generativelanguage.googleapis.com/v1beta/models/{GEMINI_MODEL_NAME}:generateContent" | |
| def create_db_from_csv(csv_file, db_file): | |
| if os.path.exists(db_file): | |
| logger.info(f"Database '{db_file}' already exists. Skipping creation.") | |
| return | |
| logger.info(f"Creating SQLite DB from CSV: {csv_file} -> {db_file}") | |
| df_iter = pd.read_csv(csv_file, chunksize=50000) | |
| conn = sqlite3.connect(db_file) | |
| cur = conn.cursor() | |
| cur.execute(f"DROP TABLE IF EXISTS {TABLE_NAME}") | |
| conn.commit() | |
| create_sql = f""" | |
| CREATE TABLE {TABLE_NAME} ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| name TEXT, | |
| image TEXT, | |
| link TEXT, | |
| ratings REAL, | |
| no_of_ratings INTEGER, | |
| discount_price TEXT, | |
| actual_price TEXT, | |
| search_terms TEXT, | |
| recommended_5 TEXT, | |
| category TEXT | |
| ); | |
| """ | |
| cur.execute(create_sql) | |
| conn.commit() | |
| # Create indexes to optimize search performance | |
| cur.execute(f"CREATE INDEX idx_name ON {TABLE_NAME}(name);") | |
| cur.execute(f"CREATE INDEX idx_category ON {TABLE_NAME}(category);") | |
| cur.execute(f"CREATE INDEX idx_discount_price ON {TABLE_NAME}(discount_price);") | |
| conn.commit() | |
| chunk_idx = 0 | |
| for chunk in df_iter: | |
| logger.info(f"Processing chunk {chunk_idx}...") | |
| chunk_idx += 1 | |
| # Ensure all required columns are present | |
| required_columns = [ | |
| "name","image","link","ratings","no_of_ratings", | |
| "discount_price","actual_price","search_terms","recommended_5","category" | |
| ] | |
| for col in required_columns: | |
| if col not in chunk.columns: | |
| chunk[col] = "" | |
| chunk.fillna("", inplace=True) | |
| records = chunk.to_dict(orient="records") | |
| insert_sql = f""" | |
| INSERT INTO {TABLE_NAME} | |
| (name, image, link, ratings, no_of_ratings, | |
| discount_price, actual_price, search_terms, | |
| recommended_5, category) | |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| """ | |
| data_to_insert = [] | |
| for r in records: | |
| # Clean and prepare data | |
| try: | |
| ratings = float(r["ratings"]) if r["ratings"] else 0.0 | |
| except ValueError: | |
| ratings = 0.0 | |
| try: | |
| no_of_ratings = int(r["no_of_ratings"]) if r["no_of_ratings"] else 0 | |
| except ValueError: | |
| no_of_ratings = 0 | |
| row_tuple = ( | |
| str(r["name"]), | |
| str(r["image"]), | |
| str(r["link"]), | |
| ratings, | |
| no_of_ratings, | |
| str(r["discount_price"]), | |
| str(r["actual_price"]), | |
| str(r["search_terms"]), | |
| str(r["recommended_5"]), | |
| str(r["category"]) | |
| ) | |
| data_to_insert.append(row_tuple) | |
| cur.executemany(insert_sql, data_to_insert) | |
| conn.commit() | |
| conn.close() | |
| logger.info("Database creation complete.") | |
| app = Flask(__name__) | |
| app.secret_key = os.getenv("FLASK_SECRET_KEY", "YOUR_SECURE_RANDOM_KEY") | |
| app.config["SESSION_TYPE"] = "filesystem" | |
| Session(app) | |
| def index(): | |
| """Home page with a search bar.""" | |
| return render_template("index.html") | |
| def autocomplete(): | |
| """Return (id, name) JSON for substring search in 'name'.""" | |
| q = request.args.get("q", "").strip() | |
| if not q: | |
| return jsonify([]) | |
| conn = sqlite3.connect(DB_PATH) | |
| cur = conn.cursor() | |
| sql = f""" | |
| SELECT id, name | |
| FROM {TABLE_NAME} | |
| WHERE LOWER(name) LIKE LOWER(?) | |
| LIMIT 10 | |
| """ | |
| wildcard = f"%{q}%" | |
| rows = cur.execute(sql, (wildcard,)).fetchall() | |
| conn.close() | |
| results = [{"id": r[0], "name": r[1]} for r in rows] | |
| return jsonify(results) | |
| def show_product(item_id): | |
| """Show product detail + top-5 recommended items from recommended_5.""" | |
| conn = sqlite3.connect(DB_PATH) | |
| cur = conn.cursor() | |
| sql = f"SELECT * FROM {TABLE_NAME} WHERE id=?" | |
| row = cur.execute(sql, (item_id,)).fetchone() | |
| if not row: | |
| conn.close() | |
| return "<h2>Product not found</h2>", 404 | |
| product = { | |
| "id": row[0], | |
| "name": row[1], | |
| "image": row[2], | |
| "link": row[3], | |
| "ratings": row[4], | |
| "no_of_ratings": row[5], | |
| "discount_price": row[6], | |
| "actual_price": row[7], | |
| "search_terms": row[8], | |
| "recommended_5": row[9], | |
| "category": row[10] | |
| } | |
| # Parse recommended_5 | |
| try: | |
| rec_list = ast.literal_eval(product["recommended_5"]) | |
| if not isinstance(rec_list, list): | |
| rec_list = [] | |
| except: | |
| rec_list = [] | |
| recommended_details = [] | |
| for rec_name in rec_list[:5]: | |
| sql_rec = f"SELECT * FROM {TABLE_NAME} WHERE name LIKE ? LIMIT 1" | |
| rec_row = cur.execute(sql_rec, (f"%{rec_name}%",)).fetchone() | |
| if rec_row: | |
| recommended_details.append({ | |
| "id": rec_row[0], | |
| "name": rec_row[1], | |
| "image": rec_row[2], | |
| "link": rec_row[3], | |
| "discount_price": rec_row[6] | |
| }) | |
| conn.close() | |
| return render_template("product.html", | |
| product=product, | |
| recommended=recommended_details) | |
| def rag_index(): | |
| """RAG Chat page storing conversation in session['rag_chat']. """ | |
| if "rag_chat" not in session: | |
| session["rag_chat"] = [] | |
| return render_template("rag.html", chat_history=session["rag_chat"]) | |
| def rag_query(): | |
| """ | |
| Process user input with an in-depth approach. | |
| """ | |
| if "rag_chat" not in session: | |
| session["rag_chat"] = [] | |
| user_input = request.form.get("rag_input", "").strip() | |
| if not user_input: | |
| return redirect(url_for("rag_index")) | |
| # Add user query to chat history | |
| session["rag_chat"].append(("user", user_input)) | |
| # Extract and process the query | |
| brand_keyword, product_type, price_val = extract_query_parameters(user_input) | |
| matched_items = filter_database(brand_keyword, product_type, price_val) | |
| db_context = build_db_context(matched_items, brand_keyword, product_type, price_val) | |
| conversation_text = construct_prompt(session["rag_chat"], db_context) | |
| # Get response from Gemini API | |
| gemini_response = gemini_generate_content( | |
| api_key=GEMINI_API_KEY, | |
| conversation_text=conversation_text | |
| ) | |
| # Add assistant's response to chat history | |
| session["rag_chat"].append(("assistant", gemini_response)) | |
| # Save session to persist the chat history | |
| session.modified = True | |
| # Render the chat page with updated history | |
| return render_template("rag.html", chat_history=session["rag_chat"]) | |
| def extract_query_parameters(user_query): | |
| """ | |
| Extract brand, product type, and price from the user's query dynamically. | |
| """ | |
| user_lower = user_query.lower() | |
| # Extract price | |
| price = None | |
| # Look for patterns like "under 5000", "below 25k", etc. | |
| price_match = re.search(r'(under|below)\s+₹?(\d+[kK]?)', user_lower) | |
| if price_match: | |
| price_str = price_match.group(2) | |
| if price_str.lower().endswith('k'): | |
| price = int(price_str[:-1]) * 1000 | |
| else: | |
| price = int(price_str) | |
| # Dynamically extract brands and product types from the database | |
| conn = sqlite3.connect(DB_PATH) | |
| cur = conn.cursor() | |
| # Fetch distinct categories and search_terms to build dynamic keyword lists | |
| cur.execute(f"SELECT DISTINCT category FROM {TABLE_NAME}") | |
| categories = [row[0].lower() for row in cur.fetchall()] | |
| cur.execute(f"SELECT DISTINCT search_terms FROM {TABLE_NAME}") | |
| search_terms = [row[0].lower() for row in cur.fetchall()] | |
| conn.close() | |
| # Initialize variables | |
| brand = None | |
| product_type = None | |
| # Check for product types in user query | |
| for category in categories: | |
| if category in user_lower: | |
| product_type = category | |
| break | |
| # If not found in category, check search_terms | |
| if not product_type: | |
| for term in search_terms: | |
| if term in user_lower: | |
| product_type = term | |
| break | |
| # For brand, attempt to extract from the search_terms by splitting | |
| possible_brands = set() | |
| for term in search_terms: | |
| words = term.split() | |
| possible_brands.update(words) | |
| possible_brands = list(possible_brands) | |
| for b in possible_brands: | |
| if b in user_lower: | |
| brand = b | |
| break | |
| return brand, product_type, price | |
| def filter_database(brand, product_type, price): | |
| """ | |
| Filter the database based on brand, product type, and price. | |
| """ | |
| conn = sqlite3.connect(DB_PATH) | |
| cur = conn.cursor() | |
| # Build dynamic SQL query | |
| sql = f"SELECT id, name, discount_price, recommended_5 FROM {TABLE_NAME} WHERE 1=1" | |
| params = [] | |
| if brand: | |
| sql += " AND LOWER(name) LIKE ?" | |
| params.append(f"%{brand}%") | |
| if product_type: | |
| sql += " AND LOWER(category) LIKE ?" | |
| params.append(f"%{product_type}%") | |
| if price: | |
| # Clean the discount_price field to extract numerical value | |
| # Assuming discount_price is stored as a string like "₹1,299" | |
| sql += " AND CAST(REPLACE(REPLACE(discount_price, '₹', ''), ',', '') AS INTEGER) <= ?" | |
| params.append(price) | |
| # Limit to 5000 for performance; adjust as needed | |
| sql += " LIMIT 5000" | |
| rows = cur.execute(sql, tuple(params)).fetchall() | |
| conn.close() | |
| return rows | |
| def build_db_context(matched_items, brand, product_type, price): | |
| """ | |
| Build a structured context string from matched database items. | |
| """ | |
| db_context = "" | |
| if matched_items: | |
| db_context += f"Found {len(matched_items)} items" | |
| if price: | |
| db_context += f" under ₹{price}" | |
| if brand or product_type: | |
| db_context += " matching your criteria" | |
| db_context += ":\n" | |
| # List up to 10 items for context | |
| for item in matched_items[:10]: | |
| item_name = item[1] | |
| item_price = item[2] | |
| db_context += f"- {item_name} at ₹{item_price}\n" | |
| else: | |
| db_context += "No matching items found in the database.\n" | |
| return db_context | |
| def construct_prompt(chat_history, db_context): | |
| """ | |
| Construct the prompt to send to Gemini, including conversation history and DB context. | |
| """ | |
| prompt = ( | |
| "You are an intelligent assistant that provides product recommendations based on the user's query and the available database.\n\n" | |
| "Conversation so far:\n" | |
| ) | |
| for speaker, message in chat_history: | |
| prompt += f"{speaker.capitalize()}: {message}\n" | |
| prompt += f"\nDatabase Context:\n{db_context}\n" | |
| prompt += "Based on the above information, provide a helpful and concise answer to the user's query." | |
| return prompt | |
| def gemini_generate_content(api_key, conversation_text): | |
| """ | |
| Call the Gemini API's generateContent endpoint with the constructed prompt. | |
| """ | |
| url = f"{GEMINI_ENDPOINT}?key={api_key}" | |
| payload = { | |
| "contents": [ | |
| { | |
| "parts": [{"text": conversation_text}] | |
| } | |
| ] | |
| } | |
| headers = {"Content-Type": "application/json"} | |
| try: | |
| resp = requests.post(url, headers=headers, data=json.dumps(payload)) | |
| except Exception as e: | |
| logger.error(f"Error during Gemini API request: {e}") | |
| return f"[Gemini Error] Failed to connect to Gemini API: {e}" | |
| try: | |
| data = resp.json() | |
| except Exception as e: | |
| logger.error(f"Invalid JSON response from Gemini API: {e}") | |
| return f"[Gemini Error] Invalid JSON response: {e}" | |
| if resp.status_code != 200: | |
| logger.error(f"Gemini API returned error {resp.status_code}: {data}") | |
| return f"[Gemini Error {resp.status_code}] {json.dumps(data, indent=2)}" | |
| # Parse the "candidates" structure | |
| candidates = data.get("candidates", []) | |
| if not candidates: | |
| logger.error(f"No candidates received from Gemini API: {data}") | |
| return f"No candidates received. Debug JSON: {json.dumps(data, indent=2)}" | |
| first_candidate = candidates[0] | |
| content = first_candidate.get("content", {}) | |
| parts = content.get("parts", []) | |
| if not parts: | |
| logger.error(f"No 'parts' found in candidate content: {data}") | |
| return f"No 'parts' found in candidate content. Debug JSON: {json.dumps(data, indent=2)}" | |
| assistant_reply = parts[0].get("text", "(No text found in the response)") | |
| logger.info(f"Gemini Assistant Reply: {assistant_reply}") | |
| return assistant_reply | |
| def main(): | |
| create_db_from_csv(CSV_PATH, DB_PATH) | |
| logger.info("Starting Flask server at http://127.0.0.1:5000") | |
| app.run(host="0.0.0.0", port=7860) | |
| if __name__ == "__main__": | |
| main() |