IndiScan / app /main.py
Wendgan's picture
Upload 9 files
2ae3f7c verified
from fastapi import FastAPI, File, UploadFile, HTTPException, Depends, Form
from fastapi.security import HTTPBasic, HTTPBasicCredentials
from fastapi.middleware.cors import CORSMiddleware
import easyocr
import numpy as np
from PIL import Image
import io
import asyncio
from typing import Optional, List, Dict
import json
import os
from datetime import datetime
# Import our utilities
from utils.scraper import Scraper
from utils.health_score import HealthScoreCalculator
from models.database import Database
app = FastAPI(title="IndiScan API")
security = HTTPBasic()
db = Database()
health_calculator = HealthScoreCalculator()
# Configure CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Allows all origins
allow_credentials=True,
allow_methods=["*"], # Allows all methods
allow_headers=["*"], # Allows all headers
)
# Initialize OCR reader
reader = easyocr.Reader(['en'])
def verify_admin(credentials: HTTPBasicCredentials = Depends(security)):
is_admin = db.verify_admin(credentials.username, credentials.password)
if not is_admin:
raise HTTPException(
status_code=401,
detail="Invalid credentials",
headers={"WWW-Authenticate": "Basic"},
)
return credentials.username
@app.post("/scan/barcode")
async def scan_barcode(barcode: str):
"""Scan a product by barcode"""
# First check our database
product = db.get_product(barcode)
if product:
return product
# If not found, scrape from various sources
async with Scraper() as scraper:
prices = await scraper.get_all_prices(barcode)
if not prices:
raise HTTPException(status_code=404, detail="Product not found")
# Use the first result to get product details
first_result = prices[0]
product_data = {
'barcode': barcode,
'name': first_result['title'],
'prices': prices,
'last_updated': datetime.now().isoformat()
}
# Add to database
db.add_product(product_data)
return product_data
@app.post("/scan/image")
async def scan_image(file: UploadFile = File(...)):
"""Scan product image for ingredients"""
contents = await file.read()
image = Image.open(io.BytesIO(contents))
# Convert to numpy array for EasyOCR
image_np = np.array(image)
# Extract text from image
results = reader.readtext(image_np)
text = ' '.join([result[1] for result in results])
# Extract ingredients and nutrition info
async with Scraper() as scraper:
ingredients = scraper.extract_ingredients(text)
nutrition_info = scraper.extract_nutrition_info(text)
# Calculate health score
health_score = health_calculator.calculate_score(ingredients)
nutrition_analysis = health_calculator.analyze_nutrition(nutrition_info)
return {
'ingredients': ingredients,
'nutrition_info': nutrition_info,
'health_score': health_score,
'nutrition_analysis': nutrition_analysis
}
@app.post("/analyze/text")
async def analyze_text(text: str = Form(...), product_type: str = Form("food")):
"""Analyze product from text description"""
async with Scraper() as scraper:
ingredients = scraper.extract_ingredients(text)
nutrition_info = scraper.extract_nutrition_info(text)
health_score = health_calculator.calculate_score(ingredients, product_type)
nutrition_analysis = health_calculator.analyze_nutrition(nutrition_info)
return {
'ingredients': ingredients,
'nutrition_info': nutrition_info,
'health_score': health_score,
'nutrition_analysis': nutrition_analysis
}
@app.get("/products/{barcode}")
async def get_product(barcode: str):
"""Get product information by barcode"""
product = db.get_product(barcode)
if not product:
raise HTTPException(status_code=404, detail="Product not found")
return product
@app.post("/products/add")
async def add_product(
barcode: str = Form(...),
name: str = Form(...),
ingredients: str = Form(...),
product_type: str = Form("food"),
admin_user: str = Depends(verify_admin)
):
"""Add or update product information (admin only)"""
try:
ingredients_list = json.loads(ingredients)
except json.JSONDecodeError:
ingredients_list = [i.strip() for i in ingredients.split(',')]
product_data = {
'barcode': barcode,
'name': name,
'ingredients': ingredients_list,
'product_type': product_type,
'added_by': admin_user,
'is_verified': True,
'last_updated': datetime.now().isoformat()
}
db.add_product(product_data)
return {"message": "Product added successfully"}
@app.get("/products/update")
async def update_products(admin_user: str = Depends(verify_admin)):
"""Update products that haven't been updated in 60 days (admin only)"""
products_to_update = db.get_products_for_update()
async with Scraper() as scraper:
for barcode in products_to_update:
try:
prices = await scraper.get_all_prices(barcode)
if prices:
first_result = prices[0]
product_data = {
'barcode': barcode,
'name': first_result['title'],
'prices': prices,
'last_updated': datetime.now().isoformat()
}
db.add_product(product_data)
except Exception as e:
print(f"Error updating product {barcode}: {str(e)}")
continue
return {"message": f"Updated {len(products_to_update)} products"}
@app.get("/export")
async def export_data(admin_user: str = Depends(verify_admin)):
"""Export database to CSV (admin only)"""
try:
export_dir = "data/exports"
os.makedirs(export_dir, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
export_path = f"{export_dir}/export_{timestamp}"
db.export_to_csv(export_path)
return {"message": f"Data exported to {export_path}"}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Export failed: {str(e)}")
@app.post("/import")
async def import_data(file: UploadFile = File(...), admin_user: str = Depends(verify_admin)):
"""Import data from CSV (admin only)"""
try:
contents = await file.read()
import_dir = "data/imports"
os.makedirs(import_dir, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
import_path = f"{import_dir}/import_{timestamp}"
with open(import_path, 'wb') as f:
f.write(contents)
db.import_from_csv(import_path)
return {"message": "Data imported successfully"}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Import failed: {str(e)}")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)