santu24's picture
Update app.py
3539598 verified
import os
import re
import ast
import json
import requests
import pandas as pd
import sqlite3
import logging
from flask import (
Flask,
request,
jsonify,
render_template,
redirect,
url_for,
session
)
from flask_session import Session
from dotenv import load_dotenv
# Load environment variables from a .env file
load_dotenv()
# Configure Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
CSV_PATH = "All_Categories.csv" # Path to your large CSV
DB_PATH = "products.db" # SQLite database file
TABLE_NAME = "products"
# Securely load your Gemini API key from environment variables
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") # Ensure you set this in your .env file
if not GEMINI_API_KEY:
logger.error("Gemini API key not found. Please set GEMINI_API_KEY in your .env file.")
raise ValueError("Gemini API key not found. Please set GEMINI_API_KEY in your .env file.")
# Replace with the correct model name your account has access to
GEMINI_MODEL_NAME = "gemini-1.5-flash" # If invalid, try "gemini-1.5-pro"
GEMINI_ENDPOINT = f"https://generativelanguage.googleapis.com/v1beta/models/{GEMINI_MODEL_NAME}:generateContent"
def create_db_from_csv(csv_file, db_file):
if os.path.exists(db_file):
logger.info(f"Database '{db_file}' already exists. Skipping creation.")
return
logger.info(f"Creating SQLite DB from CSV: {csv_file} -> {db_file}")
df_iter = pd.read_csv(csv_file, chunksize=50000)
conn = sqlite3.connect(db_file)
cur = conn.cursor()
cur.execute(f"DROP TABLE IF EXISTS {TABLE_NAME}")
conn.commit()
create_sql = f"""
CREATE TABLE {TABLE_NAME} (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT,
image TEXT,
link TEXT,
ratings REAL,
no_of_ratings INTEGER,
discount_price TEXT,
actual_price TEXT,
search_terms TEXT,
recommended_5 TEXT,
category TEXT
);
"""
cur.execute(create_sql)
conn.commit()
# Create indexes to optimize search performance
cur.execute(f"CREATE INDEX idx_name ON {TABLE_NAME}(name);")
cur.execute(f"CREATE INDEX idx_category ON {TABLE_NAME}(category);")
cur.execute(f"CREATE INDEX idx_discount_price ON {TABLE_NAME}(discount_price);")
conn.commit()
chunk_idx = 0
for chunk in df_iter:
logger.info(f"Processing chunk {chunk_idx}...")
chunk_idx += 1
# Ensure all required columns are present
required_columns = [
"name","image","link","ratings","no_of_ratings",
"discount_price","actual_price","search_terms","recommended_5","category"
]
for col in required_columns:
if col not in chunk.columns:
chunk[col] = ""
chunk.fillna("", inplace=True)
records = chunk.to_dict(orient="records")
insert_sql = f"""
INSERT INTO {TABLE_NAME}
(name, image, link, ratings, no_of_ratings,
discount_price, actual_price, search_terms,
recommended_5, category)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
"""
data_to_insert = []
for r in records:
# Clean and prepare data
try:
ratings = float(r["ratings"]) if r["ratings"] else 0.0
except ValueError:
ratings = 0.0
try:
no_of_ratings = int(r["no_of_ratings"]) if r["no_of_ratings"] else 0
except ValueError:
no_of_ratings = 0
row_tuple = (
str(r["name"]),
str(r["image"]),
str(r["link"]),
ratings,
no_of_ratings,
str(r["discount_price"]),
str(r["actual_price"]),
str(r["search_terms"]),
str(r["recommended_5"]),
str(r["category"])
)
data_to_insert.append(row_tuple)
cur.executemany(insert_sql, data_to_insert)
conn.commit()
conn.close()
logger.info("Database creation complete.")
app = Flask(__name__)
app.secret_key = os.getenv("FLASK_SECRET_KEY", "YOUR_SECURE_RANDOM_KEY")
app.config["SESSION_TYPE"] = "filesystem"
Session(app)
@app.route("/")
def index():
"""Home page with a search bar."""
return render_template("index.html")
@app.route("/autocomplete")
def autocomplete():
"""Return (id, name) JSON for substring search in 'name'."""
q = request.args.get("q", "").strip()
if not q:
return jsonify([])
conn = sqlite3.connect(DB_PATH)
cur = conn.cursor()
sql = f"""
SELECT id, name
FROM {TABLE_NAME}
WHERE LOWER(name) LIKE LOWER(?)
LIMIT 10
"""
wildcard = f"%{q}%"
rows = cur.execute(sql, (wildcard,)).fetchall()
conn.close()
results = [{"id": r[0], "name": r[1]} for r in rows]
return jsonify(results)
@app.route("/product/<int:item_id>")
def show_product(item_id):
"""Show product detail + top-5 recommended items from recommended_5."""
conn = sqlite3.connect(DB_PATH)
cur = conn.cursor()
sql = f"SELECT * FROM {TABLE_NAME} WHERE id=?"
row = cur.execute(sql, (item_id,)).fetchone()
if not row:
conn.close()
return "<h2>Product not found</h2>", 404
product = {
"id": row[0],
"name": row[1],
"image": row[2],
"link": row[3],
"ratings": row[4],
"no_of_ratings": row[5],
"discount_price": row[6],
"actual_price": row[7],
"search_terms": row[8],
"recommended_5": row[9],
"category": row[10]
}
# Parse recommended_5
try:
rec_list = ast.literal_eval(product["recommended_5"])
if not isinstance(rec_list, list):
rec_list = []
except:
rec_list = []
recommended_details = []
for rec_name in rec_list[:5]:
sql_rec = f"SELECT * FROM {TABLE_NAME} WHERE name LIKE ? LIMIT 1"
rec_row = cur.execute(sql_rec, (f"%{rec_name}%",)).fetchone()
if rec_row:
recommended_details.append({
"id": rec_row[0],
"name": rec_row[1],
"image": rec_row[2],
"link": rec_row[3],
"discount_price": rec_row[6]
})
conn.close()
return render_template("product.html",
product=product,
recommended=recommended_details)
@app.route("/rag")
def rag_index():
"""RAG Chat page storing conversation in session['rag_chat']. """
if "rag_chat" not in session:
session["rag_chat"] = []
return render_template("rag.html", chat_history=session["rag_chat"])
@app.route("/rag/query", methods=["POST"])
def rag_query():
"""
Process user input with an in-depth approach.
"""
if "rag_chat" not in session:
session["rag_chat"] = []
user_input = request.form.get("rag_input", "").strip()
if not user_input:
return redirect(url_for("rag_index"))
# Add user query to chat history
session["rag_chat"].append(("user", user_input))
# Extract and process the query
brand_keyword, product_type, price_val = extract_query_parameters(user_input)
matched_items = filter_database(brand_keyword, product_type, price_val)
db_context = build_db_context(matched_items, brand_keyword, product_type, price_val)
conversation_text = construct_prompt(session["rag_chat"], db_context)
# Get response from Gemini API
gemini_response = gemini_generate_content(
api_key=GEMINI_API_KEY,
conversation_text=conversation_text
)
# Add assistant's response to chat history
session["rag_chat"].append(("assistant", gemini_response))
# Save session to persist the chat history
session.modified = True
# Render the chat page with updated history
return render_template("rag.html", chat_history=session["rag_chat"])
def extract_query_parameters(user_query):
"""
Extract brand, product type, and price from the user's query dynamically.
"""
user_lower = user_query.lower()
# Extract price
price = None
# Look for patterns like "under 5000", "below 25k", etc.
price_match = re.search(r'(under|below)\s+₹?(\d+[kK]?)', user_lower)
if price_match:
price_str = price_match.group(2)
if price_str.lower().endswith('k'):
price = int(price_str[:-1]) * 1000
else:
price = int(price_str)
# Dynamically extract brands and product types from the database
conn = sqlite3.connect(DB_PATH)
cur = conn.cursor()
# Fetch distinct categories and search_terms to build dynamic keyword lists
cur.execute(f"SELECT DISTINCT category FROM {TABLE_NAME}")
categories = [row[0].lower() for row in cur.fetchall()]
cur.execute(f"SELECT DISTINCT search_terms FROM {TABLE_NAME}")
search_terms = [row[0].lower() for row in cur.fetchall()]
conn.close()
# Initialize variables
brand = None
product_type = None
# Check for product types in user query
for category in categories:
if category in user_lower:
product_type = category
break
# If not found in category, check search_terms
if not product_type:
for term in search_terms:
if term in user_lower:
product_type = term
break
# For brand, attempt to extract from the search_terms by splitting
possible_brands = set()
for term in search_terms:
words = term.split()
possible_brands.update(words)
possible_brands = list(possible_brands)
for b in possible_brands:
if b in user_lower:
brand = b
break
return brand, product_type, price
def filter_database(brand, product_type, price):
"""
Filter the database based on brand, product type, and price.
"""
conn = sqlite3.connect(DB_PATH)
cur = conn.cursor()
# Build dynamic SQL query
sql = f"SELECT id, name, discount_price, recommended_5 FROM {TABLE_NAME} WHERE 1=1"
params = []
if brand:
sql += " AND LOWER(name) LIKE ?"
params.append(f"%{brand}%")
if product_type:
sql += " AND LOWER(category) LIKE ?"
params.append(f"%{product_type}%")
if price:
# Clean the discount_price field to extract numerical value
# Assuming discount_price is stored as a string like "₹1,299"
sql += " AND CAST(REPLACE(REPLACE(discount_price, '₹', ''), ',', '') AS INTEGER) <= ?"
params.append(price)
# Limit to 5000 for performance; adjust as needed
sql += " LIMIT 5000"
rows = cur.execute(sql, tuple(params)).fetchall()
conn.close()
return rows
def build_db_context(matched_items, brand, product_type, price):
"""
Build a structured context string from matched database items.
"""
db_context = ""
if matched_items:
db_context += f"Found {len(matched_items)} items"
if price:
db_context += f" under ₹{price}"
if brand or product_type:
db_context += " matching your criteria"
db_context += ":\n"
# List up to 10 items for context
for item in matched_items[:10]:
item_name = item[1]
item_price = item[2]
db_context += f"- {item_name} at ₹{item_price}\n"
else:
db_context += "No matching items found in the database.\n"
return db_context
def construct_prompt(chat_history, db_context):
"""
Construct the prompt to send to Gemini, including conversation history and DB context.
"""
prompt = (
"You are an intelligent assistant that provides product recommendations based on the user's query and the available database.\n\n"
"Conversation so far:\n"
)
for speaker, message in chat_history:
prompt += f"{speaker.capitalize()}: {message}\n"
prompt += f"\nDatabase Context:\n{db_context}\n"
prompt += "Based on the above information, provide a helpful and concise answer to the user's query."
return prompt
def gemini_generate_content(api_key, conversation_text):
"""
Call the Gemini API's generateContent endpoint with the constructed prompt.
"""
url = f"{GEMINI_ENDPOINT}?key={api_key}"
payload = {
"contents": [
{
"parts": [{"text": conversation_text}]
}
]
}
headers = {"Content-Type": "application/json"}
try:
resp = requests.post(url, headers=headers, data=json.dumps(payload))
except Exception as e:
logger.error(f"Error during Gemini API request: {e}")
return f"[Gemini Error] Failed to connect to Gemini API: {e}"
try:
data = resp.json()
except Exception as e:
logger.error(f"Invalid JSON response from Gemini API: {e}")
return f"[Gemini Error] Invalid JSON response: {e}"
if resp.status_code != 200:
logger.error(f"Gemini API returned error {resp.status_code}: {data}")
return f"[Gemini Error {resp.status_code}] {json.dumps(data, indent=2)}"
# Parse the "candidates" structure
candidates = data.get("candidates", [])
if not candidates:
logger.error(f"No candidates received from Gemini API: {data}")
return f"No candidates received. Debug JSON: {json.dumps(data, indent=2)}"
first_candidate = candidates[0]
content = first_candidate.get("content", {})
parts = content.get("parts", [])
if not parts:
logger.error(f"No 'parts' found in candidate content: {data}")
return f"No 'parts' found in candidate content. Debug JSON: {json.dumps(data, indent=2)}"
assistant_reply = parts[0].get("text", "(No text found in the response)")
logger.info(f"Gemini Assistant Reply: {assistant_reply}")
return assistant_reply
def main():
create_db_from_csv(CSV_PATH, DB_PATH)
logger.info("Starting Flask server at http://127.0.0.1:5000")
app.run(host="0.0.0.0", port=7860)
if __name__ == "__main__":
main()