|
|
from fastapi import FastAPI, File, UploadFile, HTTPException, Depends, Form
|
|
|
from fastapi.security import HTTPBasic, HTTPBasicCredentials
|
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
|
import easyocr
|
|
|
import numpy as np
|
|
|
from PIL import Image
|
|
|
import io
|
|
|
import asyncio
|
|
|
from typing import Optional, List, Dict
|
|
|
import json
|
|
|
import os
|
|
|
from datetime import datetime
|
|
|
|
|
|
|
|
|
from utils.scraper import Scraper
|
|
|
from utils.health_score import HealthScoreCalculator
|
|
|
from models.database import Database
|
|
|
|
|
|
app = FastAPI(title="IndiScan API")
|
|
|
security = HTTPBasic()
|
|
|
db = Database()
|
|
|
health_calculator = HealthScoreCalculator()
|
|
|
|
|
|
|
|
|
app.add_middleware(
|
|
|
CORSMiddleware,
|
|
|
allow_origins=["*"],
|
|
|
allow_credentials=True,
|
|
|
allow_methods=["*"],
|
|
|
allow_headers=["*"],
|
|
|
)
|
|
|
|
|
|
|
|
|
reader = easyocr.Reader(['en'])
|
|
|
|
|
|
def verify_admin(credentials: HTTPBasicCredentials = Depends(security)):
|
|
|
is_admin = db.verify_admin(credentials.username, credentials.password)
|
|
|
if not is_admin:
|
|
|
raise HTTPException(
|
|
|
status_code=401,
|
|
|
detail="Invalid credentials",
|
|
|
headers={"WWW-Authenticate": "Basic"},
|
|
|
)
|
|
|
return credentials.username
|
|
|
|
|
|
@app.post("/scan/barcode")
|
|
|
async def scan_barcode(barcode: str):
|
|
|
"""Scan a product by barcode"""
|
|
|
|
|
|
product = db.get_product(barcode)
|
|
|
if product:
|
|
|
return product
|
|
|
|
|
|
|
|
|
async with Scraper() as scraper:
|
|
|
prices = await scraper.get_all_prices(barcode)
|
|
|
if not prices:
|
|
|
raise HTTPException(status_code=404, detail="Product not found")
|
|
|
|
|
|
|
|
|
first_result = prices[0]
|
|
|
product_data = {
|
|
|
'barcode': barcode,
|
|
|
'name': first_result['title'],
|
|
|
'prices': prices,
|
|
|
'last_updated': datetime.now().isoformat()
|
|
|
}
|
|
|
|
|
|
|
|
|
db.add_product(product_data)
|
|
|
return product_data
|
|
|
|
|
|
@app.post("/scan/image")
|
|
|
async def scan_image(file: UploadFile = File(...)):
|
|
|
"""Scan product image for ingredients"""
|
|
|
contents = await file.read()
|
|
|
image = Image.open(io.BytesIO(contents))
|
|
|
|
|
|
|
|
|
image_np = np.array(image)
|
|
|
|
|
|
|
|
|
results = reader.readtext(image_np)
|
|
|
text = ' '.join([result[1] for result in results])
|
|
|
|
|
|
|
|
|
async with Scraper() as scraper:
|
|
|
ingredients = scraper.extract_ingredients(text)
|
|
|
nutrition_info = scraper.extract_nutrition_info(text)
|
|
|
|
|
|
|
|
|
health_score = health_calculator.calculate_score(ingredients)
|
|
|
nutrition_analysis = health_calculator.analyze_nutrition(nutrition_info)
|
|
|
|
|
|
return {
|
|
|
'ingredients': ingredients,
|
|
|
'nutrition_info': nutrition_info,
|
|
|
'health_score': health_score,
|
|
|
'nutrition_analysis': nutrition_analysis
|
|
|
}
|
|
|
|
|
|
@app.post("/analyze/text")
|
|
|
async def analyze_text(text: str = Form(...), product_type: str = Form("food")):
|
|
|
"""Analyze product from text description"""
|
|
|
async with Scraper() as scraper:
|
|
|
ingredients = scraper.extract_ingredients(text)
|
|
|
nutrition_info = scraper.extract_nutrition_info(text)
|
|
|
|
|
|
health_score = health_calculator.calculate_score(ingredients, product_type)
|
|
|
nutrition_analysis = health_calculator.analyze_nutrition(nutrition_info)
|
|
|
|
|
|
return {
|
|
|
'ingredients': ingredients,
|
|
|
'nutrition_info': nutrition_info,
|
|
|
'health_score': health_score,
|
|
|
'nutrition_analysis': nutrition_analysis
|
|
|
}
|
|
|
|
|
|
@app.get("/products/{barcode}")
|
|
|
async def get_product(barcode: str):
|
|
|
"""Get product information by barcode"""
|
|
|
product = db.get_product(barcode)
|
|
|
if not product:
|
|
|
raise HTTPException(status_code=404, detail="Product not found")
|
|
|
return product
|
|
|
|
|
|
@app.post("/products/add")
|
|
|
async def add_product(
|
|
|
barcode: str = Form(...),
|
|
|
name: str = Form(...),
|
|
|
ingredients: str = Form(...),
|
|
|
product_type: str = Form("food"),
|
|
|
admin_user: str = Depends(verify_admin)
|
|
|
):
|
|
|
"""Add or update product information (admin only)"""
|
|
|
try:
|
|
|
ingredients_list = json.loads(ingredients)
|
|
|
except json.JSONDecodeError:
|
|
|
ingredients_list = [i.strip() for i in ingredients.split(',')]
|
|
|
|
|
|
product_data = {
|
|
|
'barcode': barcode,
|
|
|
'name': name,
|
|
|
'ingredients': ingredients_list,
|
|
|
'product_type': product_type,
|
|
|
'added_by': admin_user,
|
|
|
'is_verified': True,
|
|
|
'last_updated': datetime.now().isoformat()
|
|
|
}
|
|
|
|
|
|
db.add_product(product_data)
|
|
|
return {"message": "Product added successfully"}
|
|
|
|
|
|
@app.get("/products/update")
|
|
|
async def update_products(admin_user: str = Depends(verify_admin)):
|
|
|
"""Update products that haven't been updated in 60 days (admin only)"""
|
|
|
products_to_update = db.get_products_for_update()
|
|
|
|
|
|
async with Scraper() as scraper:
|
|
|
for barcode in products_to_update:
|
|
|
try:
|
|
|
prices = await scraper.get_all_prices(barcode)
|
|
|
if prices:
|
|
|
first_result = prices[0]
|
|
|
product_data = {
|
|
|
'barcode': barcode,
|
|
|
'name': first_result['title'],
|
|
|
'prices': prices,
|
|
|
'last_updated': datetime.now().isoformat()
|
|
|
}
|
|
|
db.add_product(product_data)
|
|
|
except Exception as e:
|
|
|
print(f"Error updating product {barcode}: {str(e)}")
|
|
|
continue
|
|
|
|
|
|
return {"message": f"Updated {len(products_to_update)} products"}
|
|
|
|
|
|
@app.get("/export")
|
|
|
async def export_data(admin_user: str = Depends(verify_admin)):
|
|
|
"""Export database to CSV (admin only)"""
|
|
|
try:
|
|
|
export_dir = "data/exports"
|
|
|
os.makedirs(export_dir, exist_ok=True)
|
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
|
export_path = f"{export_dir}/export_{timestamp}"
|
|
|
db.export_to_csv(export_path)
|
|
|
return {"message": f"Data exported to {export_path}"}
|
|
|
except Exception as e:
|
|
|
raise HTTPException(status_code=500, detail=f"Export failed: {str(e)}")
|
|
|
|
|
|
@app.post("/import")
|
|
|
async def import_data(file: UploadFile = File(...), admin_user: str = Depends(verify_admin)):
|
|
|
"""Import data from CSV (admin only)"""
|
|
|
try:
|
|
|
contents = await file.read()
|
|
|
import_dir = "data/imports"
|
|
|
os.makedirs(import_dir, exist_ok=True)
|
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
|
import_path = f"{import_dir}/import_{timestamp}"
|
|
|
|
|
|
with open(import_path, 'wb') as f:
|
|
|
f.write(contents)
|
|
|
|
|
|
db.import_from_csv(import_path)
|
|
|
return {"message": "Data imported successfully"}
|
|
|
except Exception as e:
|
|
|
raise HTTPException(status_code=500, detail=f"Import failed: {str(e)}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
import uvicorn
|
|
|
uvicorn.run(app, host="0.0.0.0", port=8000) |