File size: 7,467 Bytes
2ae3f7c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 |
from fastapi import FastAPI, File, UploadFile, HTTPException, Depends, Form
from fastapi.security import HTTPBasic, HTTPBasicCredentials
from fastapi.middleware.cors import CORSMiddleware
import easyocr
import numpy as np
from PIL import Image
import io
import asyncio
from typing import Optional, List, Dict
import json
import os
from datetime import datetime
# Import our utilities
from utils.scraper import Scraper
from utils.health_score import HealthScoreCalculator
from models.database import Database
app = FastAPI(title="IndiScan API")
security = HTTPBasic()
db = Database()
health_calculator = HealthScoreCalculator()
# Configure CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Allows all origins
allow_credentials=True,
allow_methods=["*"], # Allows all methods
allow_headers=["*"], # Allows all headers
)
# Initialize OCR reader
reader = easyocr.Reader(['en'])
def verify_admin(credentials: HTTPBasicCredentials = Depends(security)):
is_admin = db.verify_admin(credentials.username, credentials.password)
if not is_admin:
raise HTTPException(
status_code=401,
detail="Invalid credentials",
headers={"WWW-Authenticate": "Basic"},
)
return credentials.username
@app.post("/scan/barcode")
async def scan_barcode(barcode: str):
"""Scan a product by barcode"""
# First check our database
product = db.get_product(barcode)
if product:
return product
# If not found, scrape from various sources
async with Scraper() as scraper:
prices = await scraper.get_all_prices(barcode)
if not prices:
raise HTTPException(status_code=404, detail="Product not found")
# Use the first result to get product details
first_result = prices[0]
product_data = {
'barcode': barcode,
'name': first_result['title'],
'prices': prices,
'last_updated': datetime.now().isoformat()
}
# Add to database
db.add_product(product_data)
return product_data
@app.post("/scan/image")
async def scan_image(file: UploadFile = File(...)):
"""Scan product image for ingredients"""
contents = await file.read()
image = Image.open(io.BytesIO(contents))
# Convert to numpy array for EasyOCR
image_np = np.array(image)
# Extract text from image
results = reader.readtext(image_np)
text = ' '.join([result[1] for result in results])
# Extract ingredients and nutrition info
async with Scraper() as scraper:
ingredients = scraper.extract_ingredients(text)
nutrition_info = scraper.extract_nutrition_info(text)
# Calculate health score
health_score = health_calculator.calculate_score(ingredients)
nutrition_analysis = health_calculator.analyze_nutrition(nutrition_info)
return {
'ingredients': ingredients,
'nutrition_info': nutrition_info,
'health_score': health_score,
'nutrition_analysis': nutrition_analysis
}
@app.post("/analyze/text")
async def analyze_text(text: str = Form(...), product_type: str = Form("food")):
"""Analyze product from text description"""
async with Scraper() as scraper:
ingredients = scraper.extract_ingredients(text)
nutrition_info = scraper.extract_nutrition_info(text)
health_score = health_calculator.calculate_score(ingredients, product_type)
nutrition_analysis = health_calculator.analyze_nutrition(nutrition_info)
return {
'ingredients': ingredients,
'nutrition_info': nutrition_info,
'health_score': health_score,
'nutrition_analysis': nutrition_analysis
}
@app.get("/products/{barcode}")
async def get_product(barcode: str):
"""Get product information by barcode"""
product = db.get_product(barcode)
if not product:
raise HTTPException(status_code=404, detail="Product not found")
return product
@app.post("/products/add")
async def add_product(
barcode: str = Form(...),
name: str = Form(...),
ingredients: str = Form(...),
product_type: str = Form("food"),
admin_user: str = Depends(verify_admin)
):
"""Add or update product information (admin only)"""
try:
ingredients_list = json.loads(ingredients)
except json.JSONDecodeError:
ingredients_list = [i.strip() for i in ingredients.split(',')]
product_data = {
'barcode': barcode,
'name': name,
'ingredients': ingredients_list,
'product_type': product_type,
'added_by': admin_user,
'is_verified': True,
'last_updated': datetime.now().isoformat()
}
db.add_product(product_data)
return {"message": "Product added successfully"}
@app.get("/products/update")
async def update_products(admin_user: str = Depends(verify_admin)):
"""Update products that haven't been updated in 60 days (admin only)"""
products_to_update = db.get_products_for_update()
async with Scraper() as scraper:
for barcode in products_to_update:
try:
prices = await scraper.get_all_prices(barcode)
if prices:
first_result = prices[0]
product_data = {
'barcode': barcode,
'name': first_result['title'],
'prices': prices,
'last_updated': datetime.now().isoformat()
}
db.add_product(product_data)
except Exception as e:
print(f"Error updating product {barcode}: {str(e)}")
continue
return {"message": f"Updated {len(products_to_update)} products"}
@app.get("/export")
async def export_data(admin_user: str = Depends(verify_admin)):
"""Export database to CSV (admin only)"""
try:
export_dir = "data/exports"
os.makedirs(export_dir, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
export_path = f"{export_dir}/export_{timestamp}"
db.export_to_csv(export_path)
return {"message": f"Data exported to {export_path}"}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Export failed: {str(e)}")
@app.post("/import")
async def import_data(file: UploadFile = File(...), admin_user: str = Depends(verify_admin)):
"""Import data from CSV (admin only)"""
try:
contents = await file.read()
import_dir = "data/imports"
os.makedirs(import_dir, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
import_path = f"{import_dir}/import_{timestamp}"
with open(import_path, 'wb') as f:
f.write(contents)
db.import_from_csv(import_path)
return {"message": "Data imported successfully"}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Import failed: {str(e)}")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000) |