from fastapi import FastAPI, File, UploadFile, HTTPException, Depends, Form from fastapi.security import HTTPBasic, HTTPBasicCredentials from fastapi.middleware.cors import CORSMiddleware import easyocr import numpy as np from PIL import Image import io import asyncio from typing import Optional, List, Dict import json import os from datetime import datetime # Import our utilities from utils.scraper import Scraper from utils.health_score import HealthScoreCalculator from models.database import Database app = FastAPI(title="IndiScan API") security = HTTPBasic() db = Database() health_calculator = HealthScoreCalculator() # Configure CORS app.add_middleware( CORSMiddleware, allow_origins=["*"], # Allows all origins allow_credentials=True, allow_methods=["*"], # Allows all methods allow_headers=["*"], # Allows all headers ) # Initialize OCR reader reader = easyocr.Reader(['en']) def verify_admin(credentials: HTTPBasicCredentials = Depends(security)): is_admin = db.verify_admin(credentials.username, credentials.password) if not is_admin: raise HTTPException( status_code=401, detail="Invalid credentials", headers={"WWW-Authenticate": "Basic"}, ) return credentials.username @app.post("/scan/barcode") async def scan_barcode(barcode: str): """Scan a product by barcode""" # First check our database product = db.get_product(barcode) if product: return product # If not found, scrape from various sources async with Scraper() as scraper: prices = await scraper.get_all_prices(barcode) if not prices: raise HTTPException(status_code=404, detail="Product not found") # Use the first result to get product details first_result = prices[0] product_data = { 'barcode': barcode, 'name': first_result['title'], 'prices': prices, 'last_updated': datetime.now().isoformat() } # Add to database db.add_product(product_data) return product_data @app.post("/scan/image") async def scan_image(file: UploadFile = File(...)): """Scan product image for ingredients""" contents = await file.read() image = Image.open(io.BytesIO(contents)) # Convert to numpy array for EasyOCR image_np = np.array(image) # Extract text from image results = reader.readtext(image_np) text = ' '.join([result[1] for result in results]) # Extract ingredients and nutrition info async with Scraper() as scraper: ingredients = scraper.extract_ingredients(text) nutrition_info = scraper.extract_nutrition_info(text) # Calculate health score health_score = health_calculator.calculate_score(ingredients) nutrition_analysis = health_calculator.analyze_nutrition(nutrition_info) return { 'ingredients': ingredients, 'nutrition_info': nutrition_info, 'health_score': health_score, 'nutrition_analysis': nutrition_analysis } @app.post("/analyze/text") async def analyze_text(text: str = Form(...), product_type: str = Form("food")): """Analyze product from text description""" async with Scraper() as scraper: ingredients = scraper.extract_ingredients(text) nutrition_info = scraper.extract_nutrition_info(text) health_score = health_calculator.calculate_score(ingredients, product_type) nutrition_analysis = health_calculator.analyze_nutrition(nutrition_info) return { 'ingredients': ingredients, 'nutrition_info': nutrition_info, 'health_score': health_score, 'nutrition_analysis': nutrition_analysis } @app.get("/products/{barcode}") async def get_product(barcode: str): """Get product information by barcode""" product = db.get_product(barcode) if not product: raise HTTPException(status_code=404, detail="Product not found") return product @app.post("/products/add") async def add_product( barcode: str = Form(...), name: str = Form(...), ingredients: str = Form(...), product_type: str = Form("food"), admin_user: str = Depends(verify_admin) ): """Add or update product information (admin only)""" try: ingredients_list = json.loads(ingredients) except json.JSONDecodeError: ingredients_list = [i.strip() for i in ingredients.split(',')] product_data = { 'barcode': barcode, 'name': name, 'ingredients': ingredients_list, 'product_type': product_type, 'added_by': admin_user, 'is_verified': True, 'last_updated': datetime.now().isoformat() } db.add_product(product_data) return {"message": "Product added successfully"} @app.get("/products/update") async def update_products(admin_user: str = Depends(verify_admin)): """Update products that haven't been updated in 60 days (admin only)""" products_to_update = db.get_products_for_update() async with Scraper() as scraper: for barcode in products_to_update: try: prices = await scraper.get_all_prices(barcode) if prices: first_result = prices[0] product_data = { 'barcode': barcode, 'name': first_result['title'], 'prices': prices, 'last_updated': datetime.now().isoformat() } db.add_product(product_data) except Exception as e: print(f"Error updating product {barcode}: {str(e)}") continue return {"message": f"Updated {len(products_to_update)} products"} @app.get("/export") async def export_data(admin_user: str = Depends(verify_admin)): """Export database to CSV (admin only)""" try: export_dir = "data/exports" os.makedirs(export_dir, exist_ok=True) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") export_path = f"{export_dir}/export_{timestamp}" db.export_to_csv(export_path) return {"message": f"Data exported to {export_path}"} except Exception as e: raise HTTPException(status_code=500, detail=f"Export failed: {str(e)}") @app.post("/import") async def import_data(file: UploadFile = File(...), admin_user: str = Depends(verify_admin)): """Import data from CSV (admin only)""" try: contents = await file.read() import_dir = "data/imports" os.makedirs(import_dir, exist_ok=True) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") import_path = f"{import_dir}/import_{timestamp}" with open(import_path, 'wb') as f: f.write(contents) db.import_from_csv(import_path) return {"message": "Data imported successfully"} except Exception as e: raise HTTPException(status_code=500, detail=f"Import failed: {str(e)}") if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)