IndiScan / models /database.py
Wendgan's picture
Upload 9 files
2ae3f7c verified
import sqlite3
import json
from datetime import datetime, timedelta
from typing import List, Optional, Dict
import pandas as pd
class Database:
def __init__(self, db_path: str = "data/indiscan.db"):
self.db_path = db_path
self.init_db()
def init_db(self):
conn = sqlite3.connect(self.db_path)
c = conn.cursor()
# Create products table
c.execute('''
CREATE TABLE IF NOT EXISTS products (
id INTEGER PRIMARY KEY AUTOINCREMENT,
barcode TEXT UNIQUE,
name TEXT,
brand TEXT,
category TEXT,
ingredients TEXT,
nutrition_info TEXT,
health_score INTEGER,
last_updated TIMESTAMP,
image_url TEXT,
product_type TEXT,
added_by TEXT,
is_verified BOOLEAN DEFAULT 0
)
''')
# Create ingredients table
c.execute('''
CREATE TABLE IF NOT EXISTS ingredients (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE,
risk_score INTEGER,
description TEXT,
category TEXT,
concerns TEXT
)
''')
# Create users table for admin control
c.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE,
password_hash TEXT,
is_admin BOOLEAN DEFAULT 0
)
''')
# Create price tracking table
c.execute('''
CREATE TABLE IF NOT EXISTS price_tracking (
id INTEGER PRIMARY KEY AUTOINCREMENT,
product_id INTEGER,
platform TEXT,
price REAL,
timestamp TIMESTAMP,
url TEXT,
FOREIGN KEY (product_id) REFERENCES products (id)
)
''')
conn.commit()
conn.close()
def add_product(self, product_data: Dict) -> int:
conn = sqlite3.connect(self.db_path)
c = conn.cursor()
product_data['last_updated'] = datetime.now().isoformat()
if 'ingredients' in product_data and isinstance(product_data['ingredients'], list):
product_data['ingredients'] = json.dumps(product_data['ingredients'])
if 'nutrition_info' in product_data and isinstance(product_data['nutrition_info'], dict):
product_data['nutrition_info'] = json.dumps(product_data['nutrition_info'])
columns = ', '.join(product_data.keys())
placeholders = ', '.join(['?' for _ in product_data])
values = tuple(product_data.values())
try:
c.execute(f"INSERT INTO products ({columns}) VALUES ({placeholders})", values)
product_id = c.lastrowid
conn.commit()
return product_id
except sqlite3.IntegrityError:
# Update existing product
update_cols = ', '.join([f"{k}=?" for k in product_data.keys()])
c.execute(f"UPDATE products SET {update_cols} WHERE barcode=?",
(*values, product_data['barcode']))
conn.commit()
return c.lastrowid
finally:
conn.close()
def get_product(self, barcode: str) -> Optional[Dict]:
conn = sqlite3.connect(self.db_path)
c = conn.cursor()
c.execute("SELECT * FROM products WHERE barcode=?", (barcode,))
result = c.fetchone()
if result:
columns = [description[0] for description in c.description]
product = dict(zip(columns, result))
# Parse JSON strings back to Python objects
if product['ingredients']:
product['ingredients'] = json.loads(product['ingredients'])
if product['nutrition_info']:
product['nutrition_info'] = json.loads(product['nutrition_info'])
conn.close()
return product
conn.close()
return None
def update_prices(self, product_id: int, prices: List[Dict]):
conn = sqlite3.connect(self.db_path)
c = conn.cursor()
timestamp = datetime.now().isoformat()
for price_data in prices:
c.execute("""
INSERT INTO price_tracking (product_id, platform, price, timestamp, url)
VALUES (?, ?, ?, ?, ?)
""", (product_id, price_data['platform'], price_data['price'], timestamp, price_data['url']))
conn.commit()
conn.close()
def get_products_for_update(self) -> List[str]:
"""Get products that haven't been updated in 60 days"""
conn = sqlite3.connect(self.db_path)
c = conn.cursor()
sixty_days_ago = (datetime.now() - timedelta(days=60)).isoformat()
c.execute("""
SELECT barcode FROM products
WHERE last_updated < ? OR last_updated IS NULL
""", (sixty_days_ago,))
barcodes = [row[0] for row in c.fetchall()]
conn.close()
return barcodes
def export_to_csv(self, filepath: str):
"""Export the database to CSV files"""
conn = sqlite3.connect(self.db_path)
# Export products
pd.read_sql_query("SELECT * FROM products", conn).to_csv(f"{filepath}/products.csv", index=False)
# Export ingredients
pd.read_sql_query("SELECT * FROM ingredients", conn).to_csv(f"{filepath}/ingredients.csv", index=False)
# Export price tracking
pd.read_sql_query("SELECT * FROM price_tracking", conn).to_csv(f"{filepath}/price_tracking.csv", index=False)
conn.close()
def import_from_csv(self, filepath: str):
"""Import data from CSV files"""
conn = sqlite3.connect(self.db_path)
# Import products
products_df = pd.read_csv(f"{filepath}/products.csv")
products_df.to_sql('products', conn, if_exists='append', index=False)
# Import ingredients
ingredients_df = pd.read_csv(f"{filepath}/ingredients.csv")
ingredients_df.to_sql('ingredients', conn, if_exists='append', index=False)
# Import price tracking
price_df = pd.read_csv(f"{filepath}/price_tracking.csv")
price_df.to_sql('price_tracking', conn, if_exists='append', index=False)
conn.commit()
conn.close()
def verify_admin(self, username: str, password_hash: str) -> bool:
conn = sqlite3.connect(self.db_path)
c = conn.cursor()
c.execute("SELECT is_admin FROM users WHERE username=? AND password_hash=?",
(username, password_hash))
result = c.fetchone()
conn.close()
return bool(result and result[0])