|
|
import sqlite3
|
|
|
import json
|
|
|
from datetime import datetime, timedelta
|
|
|
from typing import List, Optional, Dict
|
|
|
import pandas as pd
|
|
|
|
|
|
class Database:
|
|
|
def __init__(self, db_path: str = "data/indiscan.db"):
|
|
|
self.db_path = db_path
|
|
|
self.init_db()
|
|
|
|
|
|
def init_db(self):
|
|
|
conn = sqlite3.connect(self.db_path)
|
|
|
c = conn.cursor()
|
|
|
|
|
|
|
|
|
c.execute('''
|
|
|
CREATE TABLE IF NOT EXISTS products (
|
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
|
barcode TEXT UNIQUE,
|
|
|
name TEXT,
|
|
|
brand TEXT,
|
|
|
category TEXT,
|
|
|
ingredients TEXT,
|
|
|
nutrition_info TEXT,
|
|
|
health_score INTEGER,
|
|
|
last_updated TIMESTAMP,
|
|
|
image_url TEXT,
|
|
|
product_type TEXT,
|
|
|
added_by TEXT,
|
|
|
is_verified BOOLEAN DEFAULT 0
|
|
|
)
|
|
|
''')
|
|
|
|
|
|
|
|
|
c.execute('''
|
|
|
CREATE TABLE IF NOT EXISTS ingredients (
|
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
|
name TEXT UNIQUE,
|
|
|
risk_score INTEGER,
|
|
|
description TEXT,
|
|
|
category TEXT,
|
|
|
concerns TEXT
|
|
|
)
|
|
|
''')
|
|
|
|
|
|
|
|
|
c.execute('''
|
|
|
CREATE TABLE IF NOT EXISTS users (
|
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
|
username TEXT UNIQUE,
|
|
|
password_hash TEXT,
|
|
|
is_admin BOOLEAN DEFAULT 0
|
|
|
)
|
|
|
''')
|
|
|
|
|
|
|
|
|
c.execute('''
|
|
|
CREATE TABLE IF NOT EXISTS price_tracking (
|
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
|
product_id INTEGER,
|
|
|
platform TEXT,
|
|
|
price REAL,
|
|
|
timestamp TIMESTAMP,
|
|
|
url TEXT,
|
|
|
FOREIGN KEY (product_id) REFERENCES products (id)
|
|
|
)
|
|
|
''')
|
|
|
|
|
|
conn.commit()
|
|
|
conn.close()
|
|
|
|
|
|
def add_product(self, product_data: Dict) -> int:
|
|
|
conn = sqlite3.connect(self.db_path)
|
|
|
c = conn.cursor()
|
|
|
|
|
|
product_data['last_updated'] = datetime.now().isoformat()
|
|
|
if 'ingredients' in product_data and isinstance(product_data['ingredients'], list):
|
|
|
product_data['ingredients'] = json.dumps(product_data['ingredients'])
|
|
|
if 'nutrition_info' in product_data and isinstance(product_data['nutrition_info'], dict):
|
|
|
product_data['nutrition_info'] = json.dumps(product_data['nutrition_info'])
|
|
|
|
|
|
columns = ', '.join(product_data.keys())
|
|
|
placeholders = ', '.join(['?' for _ in product_data])
|
|
|
values = tuple(product_data.values())
|
|
|
|
|
|
try:
|
|
|
c.execute(f"INSERT INTO products ({columns}) VALUES ({placeholders})", values)
|
|
|
product_id = c.lastrowid
|
|
|
conn.commit()
|
|
|
return product_id
|
|
|
except sqlite3.IntegrityError:
|
|
|
|
|
|
update_cols = ', '.join([f"{k}=?" for k in product_data.keys()])
|
|
|
c.execute(f"UPDATE products SET {update_cols} WHERE barcode=?",
|
|
|
(*values, product_data['barcode']))
|
|
|
conn.commit()
|
|
|
return c.lastrowid
|
|
|
finally:
|
|
|
conn.close()
|
|
|
|
|
|
def get_product(self, barcode: str) -> Optional[Dict]:
|
|
|
conn = sqlite3.connect(self.db_path)
|
|
|
c = conn.cursor()
|
|
|
|
|
|
c.execute("SELECT * FROM products WHERE barcode=?", (barcode,))
|
|
|
result = c.fetchone()
|
|
|
|
|
|
if result:
|
|
|
columns = [description[0] for description in c.description]
|
|
|
product = dict(zip(columns, result))
|
|
|
|
|
|
|
|
|
if product['ingredients']:
|
|
|
product['ingredients'] = json.loads(product['ingredients'])
|
|
|
if product['nutrition_info']:
|
|
|
product['nutrition_info'] = json.loads(product['nutrition_info'])
|
|
|
|
|
|
conn.close()
|
|
|
return product
|
|
|
|
|
|
conn.close()
|
|
|
return None
|
|
|
|
|
|
def update_prices(self, product_id: int, prices: List[Dict]):
|
|
|
conn = sqlite3.connect(self.db_path)
|
|
|
c = conn.cursor()
|
|
|
|
|
|
timestamp = datetime.now().isoformat()
|
|
|
|
|
|
for price_data in prices:
|
|
|
c.execute("""
|
|
|
INSERT INTO price_tracking (product_id, platform, price, timestamp, url)
|
|
|
VALUES (?, ?, ?, ?, ?)
|
|
|
""", (product_id, price_data['platform'], price_data['price'], timestamp, price_data['url']))
|
|
|
|
|
|
conn.commit()
|
|
|
conn.close()
|
|
|
|
|
|
def get_products_for_update(self) -> List[str]:
|
|
|
"""Get products that haven't been updated in 60 days"""
|
|
|
conn = sqlite3.connect(self.db_path)
|
|
|
c = conn.cursor()
|
|
|
|
|
|
sixty_days_ago = (datetime.now() - timedelta(days=60)).isoformat()
|
|
|
|
|
|
c.execute("""
|
|
|
SELECT barcode FROM products
|
|
|
WHERE last_updated < ? OR last_updated IS NULL
|
|
|
""", (sixty_days_ago,))
|
|
|
|
|
|
barcodes = [row[0] for row in c.fetchall()]
|
|
|
conn.close()
|
|
|
return barcodes
|
|
|
|
|
|
def export_to_csv(self, filepath: str):
|
|
|
"""Export the database to CSV files"""
|
|
|
conn = sqlite3.connect(self.db_path)
|
|
|
|
|
|
|
|
|
pd.read_sql_query("SELECT * FROM products", conn).to_csv(f"{filepath}/products.csv", index=False)
|
|
|
|
|
|
|
|
|
pd.read_sql_query("SELECT * FROM ingredients", conn).to_csv(f"{filepath}/ingredients.csv", index=False)
|
|
|
|
|
|
|
|
|
pd.read_sql_query("SELECT * FROM price_tracking", conn).to_csv(f"{filepath}/price_tracking.csv", index=False)
|
|
|
|
|
|
conn.close()
|
|
|
|
|
|
def import_from_csv(self, filepath: str):
|
|
|
"""Import data from CSV files"""
|
|
|
conn = sqlite3.connect(self.db_path)
|
|
|
|
|
|
|
|
|
products_df = pd.read_csv(f"{filepath}/products.csv")
|
|
|
products_df.to_sql('products', conn, if_exists='append', index=False)
|
|
|
|
|
|
|
|
|
ingredients_df = pd.read_csv(f"{filepath}/ingredients.csv")
|
|
|
ingredients_df.to_sql('ingredients', conn, if_exists='append', index=False)
|
|
|
|
|
|
|
|
|
price_df = pd.read_csv(f"{filepath}/price_tracking.csv")
|
|
|
price_df.to_sql('price_tracking', conn, if_exists='append', index=False)
|
|
|
|
|
|
conn.commit()
|
|
|
conn.close()
|
|
|
|
|
|
def verify_admin(self, username: str, password_hash: str) -> bool:
|
|
|
conn = sqlite3.connect(self.db_path)
|
|
|
c = conn.cursor()
|
|
|
|
|
|
c.execute("SELECT is_admin FROM users WHERE username=? AND password_hash=?",
|
|
|
(username, password_hash))
|
|
|
result = c.fetchone()
|
|
|
|
|
|
conn.close()
|
|
|
return bool(result and result[0]) |