Spaces:
Sleeping
Sleeping
| # Eatlytic v4 - Single file for HuggingFace Spaces | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # IMPORTS | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| import os, re, json, asyncio, logging, hashlib, datetime, secrets | |
| import sqlite3, threading, base64, hmac, uuid | |
| from contextlib import contextmanager | |
| from io import BytesIO | |
| import numpy as np | |
| import cv2 | |
| from PIL import Image, ImageDraw, ImageFont | |
| from fastapi import FastAPI, File, UploadFile, Form, Request, HTTPException, Security | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import FileResponse, JSONResponse, Response | |
| from fastapi.security import APIKeyHeader | |
| from slowapi import Limiter, _rate_limit_exceeded_handler | |
| from slowapi.util import get_remote_address | |
| from slowapi.errors import RateLimitExceeded | |
| try: | |
| from duckduckgo_search import DDGS as _DDGS; _DDGS_OK = True | |
| except Exception: | |
| _DDGS = None; _DDGS_OK = False | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # CONFIGURATION | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| FREE_SCAN_LIMIT = int(os.environ.get("FREE_SCAN_LIMIT", "10")) | |
| GROQ_API_KEY = os.environ.get("GROQ_API_KEY", "") | |
| ADMIN_TOKEN = os.environ.get("ADMIN_TOKEN", "changeme") | |
| MAX_IMAGE_BYTES = 10 * 1024 * 1024 | |
| RAZORPAY_KEY_ID = os.environ.get("RAZORPAY_KEY_ID", "") | |
| RAZORPAY_KEY_SECRET = os.environ.get("RAZORPAY_KEY_SECRET", "") | |
| if ADMIN_TOKEN == "changeme": | |
| logger.warning("β οΈ ADMIN_TOKEN is default β set it in HuggingFace Secrets") | |
| if not GROQ_API_KEY: | |
| logger.warning("β οΈ GROQ_API_KEY missing β all analysis will fail") | |
| MEDICAL_DISCLAIMER = ( | |
| "βοΈ For informational purposes only β not medical advice. " | |
| "Consult a qualified nutritionist or physician before making dietary decisions." | |
| ) | |
| LANGUAGE_MAP = { | |
| "en": "English", "zh": "Simplified Chinese", "es": "Spanish", | |
| "ar": "Arabic", "fr": "French", "hi": "Hindi (ΰ€Ήΰ€Ώΰ€¨ΰ₯ΰ€¦ΰ₯)", | |
| "pt": "Portuguese", "de": "German", | |
| } | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # DATABASE | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| DATA_DIR = os.path.join(os.getcwd(), "data") | |
| CACHE_DIR = os.environ.get("HF_HOME", "/app/.cache") | |
| MODEL_DIR = os.path.join(CACHE_DIR, "easyocr_models") | |
| os.makedirs(DATA_DIR, exist_ok=True) | |
| os.makedirs(MODEL_DIR, exist_ok=True) | |
| DB_FILE = os.path.join(DATA_DIR, "eatlytic.db") | |
| def _get_connection(): | |
| conn = sqlite3.connect(DB_FILE, check_same_thread=False, timeout=15) | |
| conn.row_factory = sqlite3.Row | |
| conn.execute("PRAGMA journal_mode=WAL") | |
| conn.execute("PRAGMA foreign_keys=ON") | |
| conn.execute("PRAGMA synchronous=NORMAL") | |
| return conn | |
| def db_conn(): | |
| conn = _get_connection() | |
| try: | |
| yield conn; conn.commit() | |
| except Exception: | |
| conn.rollback(); raise | |
| finally: | |
| conn.close() | |
| def init_db(): | |
| with db_conn() as conn: | |
| conn.executescript(""" | |
| CREATE TABLE IF NOT EXISTS users ( | |
| id TEXT PRIMARY KEY, email TEXT UNIQUE, phone TEXT UNIQUE, | |
| name TEXT DEFAULT '', created_at TEXT DEFAULT (datetime('now')), | |
| is_pro INTEGER DEFAULT 0, pro_expires TEXT, | |
| scan_count_month INTEGER DEFAULT 0, scan_month TEXT DEFAULT '', | |
| streak_days INTEGER DEFAULT 0, last_scan_date TEXT DEFAULT '', | |
| tdee REAL DEFAULT 0, persona TEXT DEFAULT 'General Adult', | |
| language TEXT DEFAULT 'en', onboarding_done INTEGER DEFAULT 0 | |
| ); | |
| CREATE TABLE IF NOT EXISTS sessions ( | |
| token TEXT PRIMARY KEY, user_id TEXT NOT NULL, | |
| created_at TEXT DEFAULT (datetime('now')), | |
| expires_at TEXT NOT NULL, device_hint TEXT DEFAULT '' | |
| ); | |
| CREATE TABLE IF NOT EXISTS devices ( | |
| device_key TEXT PRIMARY KEY, user_id TEXT, | |
| created_at TEXT DEFAULT (datetime('now')), | |
| is_pro INTEGER DEFAULT 0, month TEXT DEFAULT '', | |
| scan_count INTEGER DEFAULT 0, streak_days INTEGER DEFAULT 0, | |
| last_scan_date TEXT DEFAULT '', persona TEXT DEFAULT 'General Adult', | |
| language TEXT DEFAULT 'en', tdee REAL DEFAULT 0, | |
| onboarding_done INTEGER DEFAULT 0 | |
| ); | |
| CREATE TABLE IF NOT EXISTS scans ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| user_id TEXT, device_key TEXT, | |
| product_name TEXT DEFAULT 'Unknown', score INTEGER DEFAULT 0, | |
| verdict TEXT DEFAULT '', calories REAL DEFAULT 0, | |
| protein REAL DEFAULT 0, carbs REAL DEFAULT 0, fat REAL DEFAULT 0, | |
| sodium REAL DEFAULT 0, fiber REAL DEFAULT 0, sugar REAL DEFAULT 0, | |
| persona TEXT DEFAULT '', language TEXT DEFAULT 'en', | |
| scanned_at TEXT DEFAULT (datetime('now')), analysis_json TEXT DEFAULT '{}' | |
| ); | |
| CREATE TABLE IF NOT EXISTS daily_logs ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| user_id TEXT, device_key TEXT, log_date TEXT NOT NULL, | |
| meal_name TEXT DEFAULT '', calories REAL DEFAULT 0, | |
| protein REAL DEFAULT 0, carbs REAL DEFAULT 0, fat REAL DEFAULT 0, | |
| sodium REAL DEFAULT 0, fiber REAL DEFAULT 0, sugar REAL DEFAULT 0, | |
| source TEXT DEFAULT 'scan', logged_at TEXT DEFAULT (datetime('now')) | |
| ); | |
| CREATE TABLE IF NOT EXISTS allergen_profiles ( | |
| device_key TEXT PRIMARY KEY, user_id TEXT, | |
| allergens TEXT DEFAULT '[]', conditions TEXT DEFAULT '[]', | |
| updated_at TEXT DEFAULT (datetime('now')) | |
| ); | |
| CREATE TABLE IF NOT EXISTS food_products ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, barcode TEXT UNIQUE, | |
| name TEXT NOT NULL, brand TEXT DEFAULT '', category TEXT DEFAULT '', | |
| calories_100g REAL DEFAULT 0, protein_100g REAL DEFAULT 0, | |
| carbs_100g REAL DEFAULT 0, fat_100g REAL DEFAULT 0, | |
| sodium_100g REAL DEFAULT 0, fiber_100g REAL DEFAULT 0, | |
| sugar_100g REAL DEFAULT 0, sat_fat_100g REAL DEFAULT 0, | |
| eatlytic_score INTEGER DEFAULT 0, ingredients_raw TEXT DEFAULT '', | |
| source TEXT DEFAULT 'llm_scan', scan_count INTEGER DEFAULT 0, | |
| verified INTEGER DEFAULT 0, created_at TEXT DEFAULT (datetime('now')), | |
| updated_at TEXT DEFAULT (datetime('now')) | |
| ); | |
| CREATE TABLE IF NOT EXISTS benchmarks ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, product_name TEXT NOT NULL, | |
| ground_truth_json TEXT NOT NULL, llm_output_json TEXT DEFAULT '{}', | |
| ocr_text TEXT DEFAULT '', f1_score REAL DEFAULT 0, | |
| score_delta REAL DEFAULT 0, field_accuracy TEXT DEFAULT '{}', | |
| tested_at TEXT DEFAULT (datetime('now')), model_used TEXT DEFAULT '' | |
| ); | |
| CREATE TABLE IF NOT EXISTS nps_responses ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, device_key TEXT, user_id TEXT, | |
| score INTEGER NOT NULL, comment TEXT DEFAULT '', | |
| submitted_at TEXT DEFAULT (datetime('now')) | |
| ); | |
| CREATE TABLE IF NOT EXISTS payments ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, user_id TEXT, device_key TEXT, | |
| razorpay_order_id TEXT UNIQUE, razorpay_payment_id TEXT UNIQUE, | |
| razorpay_signature TEXT DEFAULT '', amount_paise INTEGER DEFAULT 19900, | |
| currency TEXT DEFAULT 'INR', status TEXT DEFAULT 'created', | |
| plan TEXT DEFAULT 'pro_monthly', | |
| created_at TEXT DEFAULT (datetime('now')), paid_at TEXT DEFAULT NULL | |
| ); | |
| CREATE TABLE IF NOT EXISTS api_keys ( | |
| api_key TEXT PRIMARY KEY, client_name TEXT NOT NULL, | |
| plan TEXT DEFAULT 'business', scans_this_month INTEGER DEFAULT 0, | |
| month TEXT DEFAULT '', active INTEGER DEFAULT 1, | |
| created_at TEXT DEFAULT (datetime('now')) | |
| ); | |
| CREATE TABLE IF NOT EXISTS ocr_cache ( | |
| cache_key TEXT PRIMARY KEY, result_json TEXT NOT NULL, | |
| created_at TEXT DEFAULT (datetime('now')) | |
| ); | |
| CREATE TABLE IF NOT EXISTS ai_cache ( | |
| cache_key TEXT PRIMARY KEY, result_json TEXT NOT NULL, | |
| created_at TEXT DEFAULT (datetime('now')) | |
| ); | |
| CREATE INDEX IF NOT EXISTS idx_scans_device ON scans(device_key); | |
| CREATE INDEX IF NOT EXISTS idx_daily_dev_date ON daily_logs(device_key, log_date); | |
| CREATE INDEX IF NOT EXISTS idx_food_name ON food_products(name); | |
| """) | |
| logger.info("Database ready: %s", DB_FILE) | |
| def _get_ocr_cache(key): | |
| try: | |
| with db_conn() as c: | |
| row = c.execute("SELECT result_json FROM ocr_cache WHERE cache_key=?", (key,)).fetchone() | |
| return json.loads(row["result_json"]) if row else None | |
| except Exception: return None | |
| def _set_ocr_cache(key, val): | |
| try: | |
| with db_conn() as c: | |
| c.execute("INSERT OR REPLACE INTO ocr_cache(cache_key,result_json) VALUES(?,?)", | |
| (key, json.dumps(val))) | |
| except Exception as e: logger.warning("ocr_cache set: %s", e) | |
| def _get_ai_cache(key): | |
| try: | |
| with db_conn() as c: | |
| row = c.execute("SELECT result_json FROM ai_cache WHERE cache_key=?", (key,)).fetchone() | |
| return json.loads(row["result_json"]) if row else None | |
| except Exception: return None | |
| def _set_ai_cache(key, val): | |
| try: | |
| with db_conn() as c: | |
| c.execute("INSERT OR REPLACE INTO ai_cache(cache_key,result_json) VALUES(?,?)", | |
| (key, json.dumps(val))) | |
| except Exception as e: logger.warning("ai_cache set: %s", e) | |
| init_db() | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # AUTH | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| SESSION_TTL_DAYS = 30 | |
| _pending_otps: dict = {} | |
| def _get_or_create_user(email=None, phone=None, name=""): | |
| if not email and not phone: | |
| raise ValueError("email or phone required") | |
| with db_conn() as conn: | |
| row = conn.execute( | |
| "SELECT * FROM users WHERE email=?" if email else "SELECT * FROM users WHERE phone=?", | |
| (email or phone,) | |
| ).fetchone() | |
| if row: return dict(row) | |
| uid = str(uuid.uuid4()) | |
| conn.execute("INSERT INTO users(id,email,phone,name) VALUES(?,?,?,?)", | |
| (uid, email, phone, name)) | |
| return {"id": uid, "email": email, "phone": phone, "name": name, | |
| "is_pro": 0, "streak_days": 0, "scan_count_month": 0} | |
| def _create_session(user_id, device_hint=""): | |
| token = "eat_" + secrets.token_urlsafe(40) | |
| expires = (datetime.datetime.utcnow() + datetime.timedelta(days=SESSION_TTL_DAYS)).isoformat() | |
| with db_conn() as conn: | |
| conn.execute("INSERT INTO sessions(token,user_id,expires_at,device_hint) VALUES(?,?,?,?)", | |
| (token, user_id, expires, device_hint)) | |
| return token | |
| def _get_user_from_token(token): | |
| if not token: return None | |
| with db_conn() as conn: | |
| row = conn.execute( | |
| "SELECT u.* FROM sessions s JOIN users u ON s.user_id=u.id WHERE s.token=? AND s.expires_at>datetime('now')", | |
| (token,) | |
| ).fetchone() | |
| return dict(row) if row else None | |
| def _send_otp(email): | |
| otp = str(secrets.randbelow(900000) + 100000) | |
| expires = datetime.datetime.utcnow() + datetime.timedelta(minutes=10) | |
| _pending_otps[email.lower()] = (otp, expires) | |
| logger.info("OTP for %s: %s (dev mode)", email, otp) | |
| return otp | |
| def _verify_otp(email, otp): | |
| key = email.lower() | |
| entry = _pending_otps.get(key) | |
| if not entry: return None | |
| stored, expires = entry | |
| if datetime.datetime.utcnow() > expires: | |
| del _pending_otps[key]; return None | |
| if stored != otp.strip(): return None | |
| del _pending_otps[key] | |
| return _get_or_create_user(email=email) | |
| def _check_scan_quota_user(user_id): | |
| month_key = datetime.date.today().isoformat()[:7] | |
| with db_conn() as conn: | |
| row = conn.execute( | |
| "SELECT is_pro, scan_month, scan_count_month FROM users WHERE id=?", (user_id,) | |
| ).fetchone() | |
| if not row: return {"allowed": False, "scans_used": 0, "scans_remaining": 0, "is_pro": False} | |
| if row["scan_month"] != month_key: | |
| conn.execute("UPDATE users SET scan_month=?, scan_count_month=0 WHERE id=?", (month_key, user_id)) | |
| count = 0 | |
| else: count = row["scan_count_month"] | |
| if row["is_pro"]: | |
| conn.execute("UPDATE users SET scan_count_month=scan_count_month+1 WHERE id=?", (user_id,)) | |
| return {"allowed": True, "scans_used": count+1, "scans_remaining": 9999, "is_pro": True} | |
| if count >= FREE_SCAN_LIMIT: | |
| return {"allowed": False, "scans_used": count, "scans_remaining": 0, "is_pro": False} | |
| conn.execute("UPDATE users SET scan_count_month=scan_count_month+1 WHERE id=?", (user_id,)) | |
| new = count + 1 | |
| return {"allowed": True, "scans_used": new, "scans_remaining": FREE_SCAN_LIMIT - new, "is_pro": False} | |
| def _update_streak_user(user_id): | |
| today = datetime.date.today().isoformat() | |
| yesterday = (datetime.date.today() - datetime.timedelta(days=1)).isoformat() | |
| with db_conn() as conn: | |
| row = conn.execute("SELECT streak_days, last_scan_date FROM users WHERE id=?", (user_id,)).fetchone() | |
| if not row or row["last_scan_date"] == today: return | |
| streak = (row["streak_days"] + 1) if row["last_scan_date"] == yesterday else 1 | |
| conn.execute("UPDATE users SET streak_days=?, last_scan_date=? WHERE id=?", (streak, today, user_id)) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # IMAGE PROCESSING | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def validate_image(content): | |
| if len(content) > MAX_IMAGE_BYTES: | |
| raise ValueError(f"Image too large ({len(content)//1024}KB). Max 10MB.") | |
| try: | |
| img = Image.open(BytesIO(content)).convert("RGB") | |
| except Exception: | |
| raise ValueError("Invalid image format. Upload JPEG, PNG, or WebP.") | |
| w, h = img.size | |
| if max(w, h) > 2048: | |
| ratio = 2048 / max(w, h) | |
| img = img.resize((int(w*ratio), int(h*ratio)), Image.LANCZOS) | |
| buf = BytesIO(); img.save(buf, format="JPEG", quality=92) | |
| return buf.getvalue() | |
| return content | |
| def assess_image_quality(content): | |
| try: | |
| img_np = np.array(Image.open(BytesIO(content)).convert("RGB")) | |
| gray = cv2.cvtColor(img_np, cv2.COLOR_RGB2GRAY) | |
| lap = float(cv2.Laplacian(gray, cv2.CV_64F).var()) | |
| gx = cv2.Sobel(gray, cv2.CV_64F, 1, 0, ksize=3) | |
| gy = cv2.Sobel(gray, cv2.CV_64F, 0, 1, ksize=3) | |
| ten = float(np.mean(gx**2 + gy**2)) | |
| diff = gray[:, 2:].astype(np.float64) - gray[:, :-2].astype(np.float64) | |
| bren = float(np.mean(diff**2)) | |
| h, w = gray.shape | |
| scores = [cv2.Laplacian(gray[y:y+64, x:x+64], cv2.CV_64F).var() | |
| for y in range(0, h-64, 64) for x in range(0, w-64, 64)] | |
| loc = float(np.median(scores)) if scores else 0.0 | |
| comp = (0.25*min(lap/300*100,100) + 0.20*min(ten/500*100,100) + | |
| 0.20*min(bren/200*100,100) + 0.35*min(loc/300*100,100)) | |
| if comp < 15: sev, blur = "severe", True | |
| elif comp < 35: sev, blur = "moderate", True | |
| elif comp < 55: sev, blur = "mild", True | |
| else: sev, blur = "none", False | |
| return {"blur_score": round(comp,2), "is_blurry": blur, "blur_severity": sev, | |
| "quality": "poor" if comp<35 else ("fair" if comp<55 else "good")} | |
| except Exception as e: | |
| logger.error("Blur detection: %s", e) | |
| return {"blur_score": 999, "is_blurry": False, "blur_severity": "unknown", "quality": "unknown"} | |
| def deblur_and_enhance(content, severity="moderate"): | |
| img_np = np.array(Image.open(BytesIO(content)).convert("RGB")) | |
| log = [] | |
| h, w = img_np.shape[:2] | |
| if min(h, w) < 1200: | |
| s = 1200/min(h,w) | |
| img_np = cv2.resize(img_np, (int(w*s), int(h*s)), interpolation=cv2.INTER_LANCZOS4) | |
| log.append("upscale") | |
| if severity in ("severe","moderate"): | |
| bgr = cv2.cvtColor(img_np, cv2.COLOR_RGB2BGR) | |
| img_np = cv2.cvtColor(cv2.fastNlMeansDenoisingColored(bgr, None, 8 if severity=="severe" else 5, 8 if severity=="severe" else 5, 7, 21), cv2.COLOR_BGR2RGB) | |
| log.append("NLM") | |
| if severity != "mild": | |
| gray = cv2.cvtColor(img_np, cv2.COLOR_RGB2GRAY) | |
| psf_s = 9 if severity=="severe" else 5 | |
| kr = 0.01 if severity=="severe" else 0.025 | |
| psf = cv2.getGaussianKernel(psf_s, psf_s/3.0); psf = psf@psf.T; psf/=psf.sum() | |
| pad = np.zeros_like(gray, dtype=np.float64) | |
| pad[:psf.shape[0],:psf.shape[1]] = psf | |
| pad = np.roll(np.roll(pad, -psf.shape[0]//2, 0), -psf.shape[1]//2, 1) | |
| Y = np.fft.fft2(gray.astype(np.float64)/255.0); H = np.fft.fft2(pad) | |
| W = np.conj(H)/(np.abs(H)**2+kr) | |
| rest = np.clip(np.real(np.fft.ifft2(W*Y))*255.0, 0, 255).astype(np.uint8) | |
| lab = cv2.cvtColor(img_np, cv2.COLOR_RGB2LAB); lab[:,:,0]=rest | |
| img_np = cv2.cvtColor(lab, cv2.COLOR_LAB2RGB); log.append(f"Wiener(psf={psf_s})") | |
| sm = {"severe":2.2,"moderate":1.8,"mild":1.2}; rm = {"severe":4,"moderate":3,"mild":2} | |
| s = sm.get(severity,1.8); r = rm.get(severity,3) | |
| blurred = cv2.GaussianBlur(img_np,(r*2+1,r*2+1),0) | |
| mask = cv2.subtract(img_np.astype(np.int16), blurred.astype(np.int16)) | |
| img_np = np.clip(img_np.astype(np.float32)+s*mask,0,255).astype(np.uint8); log.append("unsharp") | |
| lab = cv2.cvtColor(img_np, cv2.COLOR_RGB2LAB) | |
| cl = cv2.createCLAHE(clipLimit={"severe":3.0,"moderate":2.5,"mild":1.8}.get(severity,2.5), tileGridSize=(8,8)) | |
| lab[:,:,0]=cl.apply(lab[:,:,0]); img_np=cv2.cvtColor(lab,cv2.COLOR_LAB2RGB); log.append("CLAHE") | |
| buf = BytesIO(); Image.fromarray(img_np).save(buf, format="JPEG", quality=92) | |
| return buf.getvalue(), " β ".join(log) | |
| def image_to_b64(content): | |
| return "data:image/jpeg;base64," + base64.b64encode(content).decode() | |
| def ocr_quality_score(r): | |
| return r.get("word_count",0)*0.6 + r.get("avg_confidence",0)*100*0.4 | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # OCR | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _LANG_READERS = {}; _READERS_LOCK = threading.Lock() | |
| _EASYOCR_LANG_MAP = { | |
| "en":["en"],"hi":["en","hi"],"zh":["en","ch_sim"], | |
| "ta":["en","ta"],"te":["en","te"],"bn":["en","bn"], | |
| } | |
| def _get_reader(lang_hint): | |
| langs = _EASYOCR_LANG_MAP.get(lang_hint, ["en"]) | |
| key = "_".join(sorted(langs)) | |
| if key not in _LANG_READERS: | |
| with _READERS_LOCK: | |
| if key not in _LANG_READERS: | |
| import easyocr as _easyocr | |
| logger.info("Loading EasyOCR for %s", langs) | |
| _LANG_READERS[key] = _easyocr.Reader(langs, gpu=False, model_storage_directory=MODEL_DIR) | |
| return _LANG_READERS[key] | |
| def run_ocr(content, lang_hint="en"): | |
| cache_key = f"{hashlib.md5(content).hexdigest()}_{lang_hint}" | |
| cached = _get_ocr_cache(cache_key) | |
| if cached: return cached | |
| img = Image.open(BytesIO(content)).convert("RGB"); img.thumbnail((1200,1200)) | |
| results = _get_reader(lang_hint).readtext(np.array(img), detail=1) | |
| words = [r[1] for r in results] | |
| confidences = [r[2] for r in results] | |
| avg_conf = sum(confidences)/len(confidences) if confidences else 0.0 | |
| result = {"text": " ".join(words), "word_count": len(words), | |
| "avg_confidence": round(avg_conf,3), | |
| "is_readable": len(words)>=3 and avg_conf>0.15} | |
| _set_ocr_cache(cache_key, result) | |
| return result | |
| LABEL_KEYWORDS = [ | |
| 'ingredients','nutrition','nutritional','calories','calorie','protein','fat', | |
| 'carbohydrate','carbs','sodium','sugar','sugars','fiber','fibre','serving', | |
| 'cholesterol','saturated','trans','vitamin','calcium','iron','per 100g', | |
| 'per 100 g','daily value','daily values','amount per','total fat','contains', | |
| 'may contain','preservative','flavour','flavor','emulsifier','mg','mcg','kcal', | |
| 'kj','% dv','%dv','g per','per serving','fssai','best before','mfg','mrp', | |
| 'net wt','manufactured','packed','allergen','gluten','lactose','nuts', | |
| 'energy','carbohydrates','dietary','mineral','zinc','phosphorus', | |
| # Indian label specific | |
| 'veg','non-veg','vegetarian','lic no','batch no','shelf life', | |
| 'use before','consume before','store in','keep dry', | |
| ] | |
| # ONLY strong marketing-only phrases that NEVER appear on back labels | |
| # Removed: natural, organic, light, baked, roasted, flavoured β all appear in ingredient lists | |
| FRONT_PACK_SIGNALS = [ | |
| 'new improved','now better','great taste','loved by','award winning', | |
| 'no.1 brand','number 1','trusted brand', | |
| ] | |
| NUTRITION_TABLE_ANCHORS = [ | |
| 'per 100g','per 100 g','per serving','serving size','amount per','daily value', | |
| 'daily values','% dv','%dv','calories','calorie','kcal','kj','energy', | |
| 'nutrition facts','nutritional information','nutritional value','total fat', | |
| 'saturated fat','trans fat','total carbohydrate','dietary fiber','ingredients:', | |
| 'ingredients list','fssai','best before','mfg','mrp','net wt','net weight', | |
| # Indian label specific anchors | |
| 'veg ','non-veg','licence no','lic. no','batch no','mfg date', | |
| ] | |
| def detect_label_presence(ocr_text): | |
| """ | |
| Detect whether OCR text is from a nutrition/ingredients label (back of pack) | |
| vs front-of-pack marketing content. | |
| Fixed false-positive rate for Indian food labels: | |
| - Removed common ingredient words (natural, organic, baked) from FRONT_PACK_SIGNALS | |
| - Lowered anchor threshold: only 1 anchor needed if label keywords are strong | |
| - Front-of-pack rejection only when there are ZERO nutrition keywords AND | |
| the text is dominated by marketing language | |
| - Added India-specific anchors (FSSAI, MFG, batch no, veg/non-veg) | |
| """ | |
| if not ocr_text: | |
| return {'has_label': False, 'confidence': 'high', | |
| 'label_hits': [], 'front_hits': [], 'suggestion': 'no_text'} | |
| tl = ocr_text.lower() | |
| label_hits = [kw for kw in LABEL_KEYWORDS if kw in tl] | |
| front_hits = [kw for kw in FRONT_PACK_SIGNALS if kw in tl] | |
| anchor_hits = [kw for kw in NUTRITION_TABLE_ANCHORS if kw in tl] | |
| ls = len(label_hits) | |
| fs = len(front_hits) | |
| num_anchors = len(anchor_hits) | |
| # ββ PASS: strong nutrition evidence regardless of front signals ββββββ | |
| # If we have multiple nutrition anchors + multiple label keywords β definite label | |
| if num_anchors >= 2 and ls >= 3: | |
| return {'has_label': True, | |
| 'confidence': 'high' if ls >= 6 else 'medium', | |
| 'label_hits': label_hits[:5], 'front_hits': front_hits[:3], | |
| 'suggestion': None} | |
| # ββ PASS: moderate evidence β 1 anchor + any label keyword ββββββββββ | |
| # Covers Indian labels with non-standard formatting | |
| if num_anchors >= 1 and ls >= 2: | |
| return {'has_label': True, 'confidence': 'medium', | |
| 'label_hits': label_hits[:5], 'front_hits': front_hits[:3], | |
| 'suggestion': None} | |
| # ββ PASS: weak evidence but plausible β several label keywords βββββββ | |
| # e.g. label says "protein 8g fat 5g sugar 3g" without explicit headers | |
| if ls >= 4 and fs == 0: | |
| return {'has_label': True, 'confidence': 'low', | |
| 'label_hits': label_hits, 'front_hits': [], | |
| 'suggestion': None} | |
| # ββ PASS: any single anchor (fssai, best before, mfg) βββββββββββββββ | |
| # These never appear on front-of-pack β their presence confirms back label | |
| strong_anchors = ['fssai', 'best before', 'mfg', 'mrp', 'net wt', 'net weight', | |
| 'batch no', 'lic no', 'manufactured', 'packed by'] | |
| if any(sa in tl for sa in strong_anchors): | |
| return {'has_label': True, 'confidence': 'medium', | |
| 'label_hits': label_hits, 'front_hits': front_hits, | |
| 'suggestion': None} | |
| # ββ FAIL: only reject if truly no nutrition evidence ββββββββββββββββ | |
| # Must have: zero anchors AND less than 2 label keywords AND | |
| # either dominated by marketing OR truly empty | |
| if ls >= 2: | |
| # Still has some label words β give benefit of the doubt, try analysis | |
| return {'has_label': True, 'confidence': 'low', | |
| 'label_hits': label_hits, 'front_hits': front_hits, | |
| 'suggestion': 'partial'} | |
| # Genuine front-of-pack: marketing words but zero nutrition content | |
| sug = 'wrong_side' if fs > 0 else 'no_label' | |
| return {'has_label': False, 'confidence': 'high', | |
| 'label_hits': label_hits, 'front_hits': front_hits[:3], | |
| 'suggestion': sug} | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # LLM | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _groq_client = None | |
| if GROQ_API_KEY: | |
| from groq import Groq | |
| _groq_client = Groq(api_key=GROQ_API_KEY) | |
| def call_llm(prompt, max_tokens=2500): | |
| if not _groq_client: raise RuntimeError("GROQ_API_KEY not set") | |
| for model in ["llama-3.3-70b-versatile","llama-3.1-8b-instant"]: | |
| try: | |
| comp = _groq_client.chat.completions.create( | |
| model=model, messages=[{"role":"user","content":prompt}], | |
| temperature=0.1, max_tokens=max_tokens, | |
| response_format={"type":"json_object"}) | |
| return comp.choices[0].message.content | |
| except Exception as exc: | |
| logger.warning("LLM %s failed: %s", model, exc) | |
| raise RuntimeError("All LLM models failed") | |
| def _sanitise_result(result): | |
| cd = result.get("chart_data") | |
| if isinstance(cd,list) and len(cd)==3 and all(isinstance(x,(int,float)) for x in cd): | |
| total = sum(cd) | |
| if total > 0 and total != 100: | |
| scaled = [round(v*100/total) for v in cd] | |
| scaled[scaled.index(max(scaled))] += 100 - sum(scaled) | |
| result["chart_data"] = scaled | |
| else: | |
| result["chart_data"] = [70,20,10] | |
| for n in result.get("nutrient_breakdown",[]): | |
| m = re.search(r"[\d]+\.?[\d]*", str(n.get("value","")).replace(",",".")) | |
| if m: n["value"] = float(m.group()) | |
| result.setdefault("score",5); result.setdefault("verdict","Analyzed") | |
| result.setdefault("product_name","Unknown Product") | |
| result.setdefault("nutrient_breakdown",[]); result.setdefault("pros",[]) | |
| result.setdefault("cons",[]); result.setdefault("age_warnings",[]) | |
| result.setdefault("is_low_confidence",False) | |
| return result | |
| async def analyse_label(extracted_text, persona, age_group, product_category, | |
| language, web_context, blur_info, label_confidence): | |
| cache_key = f"v4:{language}:{persona}:{age_group}:{extracted_text[:80]}" | |
| cached = _get_ai_cache(cache_key) | |
| if cached: return cached | |
| lang_name = LANGUAGE_MAP.get(language, "English") | |
| conf_note = ("β οΈ Label text may be partial β only list nutrients you can read confidently." | |
| if label_confidence == "low" else "") | |
| blur_ctx = "" | |
| if blur_info.get("detected"): | |
| verb = "enhanced via Wiener deconvolution" if blur_info.get("deblurred") else "blurry, used original" | |
| blur_ctx = f"IMAGE: {blur_info['severity']}ly blurry ({verb}). Only report confident values." | |
| prompt = f"""[INST] | |
| You are an expert nutritional scientist and food safety auditor. | |
| CRITICAL: Respond ENTIRELY in {lang_name}. Every text field MUST be in {lang_name}. | |
| Persona: {persona} | Age: {age_group} | Category: {product_category} | |
| {conf_note} | |
| {blur_ctx} | |
| Label Text: "{extracted_text}" | |
| Web Context: "{web_context}" | |
| Return ONLY valid JSON β no markdown, no preamble: | |
| {{ | |
| "product_name" : "Short name from label", | |
| "product_category" : "Snack|Dairy|Beverage|Cereal|Supplement|etc.", | |
| "score" : <INTEGER 1-10 per SCORING RUBRIC β never default to 6 or 7>, | |
| "verdict" : "Two-word verdict in {lang_name}", | |
| "chart_data" : [<Safe%>, <Moderate%>, <Risky%>], | |
| "summary" : "2-sentence professional summary in {lang_name}.", | |
| "eli5_explanation" : "Child-friendly explanation with emojis in {lang_name}.", | |
| "molecular_insight" : "1-2 sentences on biochemical impact in {lang_name}.", | |
| "paragraph_benefits": "Full paragraph on genuine benefits in {lang_name}.", | |
| "paragraph_uniqueness": "Unique characteristics OR 2 better alternatives in {lang_name}.", | |
| "is_unique" : true, | |
| "nutrient_breakdown": [ | |
| {{"name":"Protein","value":<ACTUAL g>,"unit":"g","rating":"good","impact":"brief note in {lang_name}"}}, | |
| {{"name":"Sugar","value":<ACTUAL g>,"unit":"g","rating":"moderate","impact":"brief note"}}, | |
| {{"name":"Fat","value":<ACTUAL g>,"unit":"g","rating":"good","impact":"brief note"}}, | |
| {{"name":"Sodium","value":<ACTUAL mg>,"unit":"mg","rating":"caution","impact":"brief note"}}, | |
| {{"name":"Fiber","value":<ACTUAL g>,"unit":"g","rating":"good","impact":"brief note"}} | |
| ], | |
| "pros" : ["Benefit 1 in {lang_name}", "Benefit 2", "Benefit 3"], | |
| "cons" : ["Risk 1 in {lang_name}", "Risk 2"], | |
| "age_warnings" : [ | |
| {{"group":"Children","emoji":"πΆ","status":"warning","message":"in {lang_name}"}}, | |
| {{"group":"Adults","emoji":"π§","status":"good","message":"in {lang_name}"}}, | |
| {{"group":"Seniors","emoji":"π΄","status":"caution","message":"in {lang_name}"}}, | |
| {{"group":"Pregnant","emoji":"π€°","status":"caution","message":"in {lang_name}"}} | |
| ], | |
| "better_alternative": "A specific healthier alternative in {lang_name}.", | |
| "is_low_confidence" : false | |
| }} | |
| SCORING RUBRIC β MANDATORY, never use 6 or 7 as defaults: | |
| 9-10: Whole food, no added sugar, low sodium, high fibre/protein | |
| 7-8 : Mildly processed, sugar <5g/100g, reasonable sodium | |
| 5-6 : Processed, sugar 5-15g/100g OR sodium 400-700mg/100g | |
| 3-4 : High sugar >15g/100g OR sodium >700mg/100g OR poor profile | |
| 1-2 : Ultra-processed, very high sugar/sodium/sat-fat | |
| RULES: chart_data sums to 100 | rating: good|moderate|caution|bad | status: good|caution|warning | |
| [/INST]""" | |
| raw = await asyncio.to_thread(call_llm, prompt, 2500) | |
| result = _sanitise_result(json.loads(raw)) | |
| result["disclaimer"] = MEDICAL_DISCLAIMER | |
| cacheable = {k:v for k,v in result.items() if k not in ("blur_info","scan_meta","allergen_warning")} | |
| _set_ai_cache(cache_key, cacheable) | |
| return result | |
| def upsert_food_product(name, nutrients, score, ingredients_raw="", | |
| barcode=None, brand="", category="", source="llm_scan"): | |
| def _get(key): | |
| for n in nutrients: | |
| if key in n.get("name","").lower(): | |
| v = n.get("value",0) | |
| return float(v) if isinstance(v,(int,float)) else 0 | |
| return 0 | |
| cal=_get("calorie") or _get("energy"); prot=_get("protein") | |
| carb=_get("carb"); fat=_get("fat"); sod=_get("sodium") | |
| fib=_get("fiber") or _get("fibre"); sug=_get("sugar"); sat=_get("saturated") | |
| with db_conn() as conn: | |
| existing = conn.execute( | |
| "SELECT id FROM food_products WHERE barcode=?" if barcode else "SELECT id FROM food_products WHERE name=? AND brand=?", | |
| (barcode,) if barcode else (name.strip(), brand.strip()) | |
| ).fetchone() | |
| if existing: | |
| conn.execute("UPDATE food_products SET scan_count=scan_count+1, updated_at=datetime('now') WHERE id=?", | |
| (existing["id"],)); return existing["id"] | |
| cursor = conn.execute( | |
| "INSERT INTO food_products(name,brand,category,barcode,calories_100g,protein_100g,carbs_100g,fat_100g,sodium_100g,fiber_100g,sugar_100g,sat_fat_100g,eatlytic_score,ingredients_raw,source,scan_count) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,1)", | |
| (name.strip(),brand,category,barcode,cal,prot,carb,fat,sod,fib,sug,sat,score,ingredients_raw,source)) | |
| return cursor.lastrowid | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # PAYMENTS | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| PRO_AMOUNT_PAISE = 19900 | |
| def _create_razorpay_order(user_id, device_key=""): | |
| if not RAZORPAY_KEY_ID or not RAZORPAY_KEY_SECRET: | |
| raise RuntimeError("RAZORPAY_KEY_ID and RAZORPAY_KEY_SECRET env vars required") | |
| try: | |
| import razorpay | |
| client = razorpay.Client(auth=(RAZORPAY_KEY_ID, RAZORPAY_KEY_SECRET)) | |
| except ImportError: | |
| raise RuntimeError("razorpay package not installed") | |
| order = client.order.create({ | |
| "amount": PRO_AMOUNT_PAISE, "currency": "INR", | |
| "receipt": f"eat_{user_id[:8]}_{datetime.datetime.utcnow().strftime('%Y%m%d%H%M%S')}", | |
| "notes": {"user_id": user_id, "product": "eatlytic_pro"}, | |
| }) | |
| with db_conn() as conn: | |
| conn.execute("INSERT INTO payments(user_id,device_key,razorpay_order_id,amount_paise,status) VALUES(?,?,?,?,?)", | |
| (user_id, device_key, order["id"], PRO_AMOUNT_PAISE, "created")) | |
| return {"order_id": order["id"], "amount": PRO_AMOUNT_PAISE, "currency": "INR", | |
| "key_id": RAZORPAY_KEY_ID} | |
| def _verify_razorpay_payment(order_id, payment_id, signature): | |
| expected = hmac.new(RAZORPAY_KEY_SECRET.encode(), | |
| f"{order_id}|{payment_id}".encode(), hashlib.sha256).hexdigest() | |
| return hmac.compare_digest(expected, signature) | |
| def _activate_pro_payment(order_id, payment_id, signature): | |
| if not _verify_razorpay_payment(order_id, payment_id, signature): | |
| raise ValueError("Invalid payment signature β possible tampering") | |
| expires = (datetime.datetime.utcnow() + datetime.timedelta(days=31)).isoformat() | |
| with db_conn() as conn: | |
| row = conn.execute("SELECT user_id, device_key FROM payments WHERE razorpay_order_id=?", (order_id,)).fetchone() | |
| if not row: raise ValueError(f"Order {order_id} not found") | |
| user_id = row["user_id"] | |
| device_key = row["device_key"] | |
| conn.execute("UPDATE payments SET razorpay_payment_id=?,razorpay_signature=?,status='paid',paid_at=datetime('now') WHERE razorpay_order_id=?", | |
| (payment_id, signature, order_id)) | |
| if user_id: | |
| conn.execute("UPDATE users SET is_pro=1, pro_expires=? WHERE id=?", (expires, user_id)) | |
| if device_key: | |
| conn.execute("UPDATE devices SET is_pro=1 WHERE device_key=?", (device_key,)) | |
| return {"success": True, "user_id": user_id, "expires": expires} | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # FASTAPI APP | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| limiter = Limiter(key_func=get_remote_address) | |
| app = FastAPI(title="Eatlytic v4 β Food Intelligence", version="4.0") | |
| app.state.limiter = limiter | |
| app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler) | |
| app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["GET","POST","DELETE","PATCH"], allow_headers=["*"]) | |
| api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False) | |
| def _get_request_user(request): | |
| auth = request.headers.get("Authorization","") | |
| token = auth.removeprefix("Bearer ").strip() if auth.startswith("Bearer ") else None | |
| return _get_user_from_token(token) if token else None | |
| def _device_key(request): | |
| ip = request.client.host if request.client else "unknown" | |
| ua = request.headers.get("user-agent","") | |
| return hashlib.md5(f"{ip}:{ua}".encode()).hexdigest()[:16] | |
| def _ensure_device(dk): | |
| try: | |
| with db_conn() as conn: | |
| conn.execute("INSERT OR IGNORE INTO devices(device_key) VALUES(?)", (dk,)) | |
| except Exception: pass | |
| def _check_scan_quota(user, device_key): | |
| if user: return _check_scan_quota_user(user["id"]) | |
| month_key = datetime.date.today().isoformat()[:7] | |
| _ensure_device(device_key) | |
| with db_conn() as conn: | |
| row = conn.execute("SELECT is_pro, month, scan_count FROM devices WHERE device_key=?", (device_key,)).fetchone() | |
| if not row: return {"allowed":False,"scans_used":0,"scans_remaining":0,"is_pro":False} | |
| if row["month"] != month_key: | |
| conn.execute("UPDATE devices SET month=?, scan_count=0 WHERE device_key=?", (month_key, device_key)) | |
| count = 0 | |
| else: count = row["scan_count"] | |
| if row["is_pro"]: | |
| conn.execute("UPDATE devices SET scan_count=scan_count+1 WHERE device_key=?", (device_key,)) | |
| return {"allowed":True,"scans_used":count+1,"scans_remaining":9999,"is_pro":True} | |
| if count >= FREE_SCAN_LIMIT: | |
| return {"allowed":False,"scans_used":count,"scans_remaining":0,"is_pro":False} | |
| conn.execute("UPDATE devices SET scan_count=scan_count+1 WHERE device_key=?", (device_key,)) | |
| new = count + 1 | |
| return {"allowed":True,"scans_used":new,"scans_remaining":FREE_SCAN_LIMIT-new,"is_pro":False} | |
| def _get_live_search(query): | |
| if not _DDGS_OK: return "Web search unavailable." | |
| try: | |
| with _DDGS() as ddgs: | |
| results = [f"{r['title']}: {r['body']}" for r in ddgs.text(query, max_results=3)] | |
| return "\n".join(results) if results else "No web data." | |
| except Exception as exc: | |
| logger.warning("Web search: %s", exc); return "No web data." | |
| # ββ Core routes ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def home(): return FileResponse("index.html") | |
| async def health(): return {"status":"ok","version":"4.0","db":"sqlite-wal"} | |
| async def check_image(request: Request, image: UploadFile = File(...)): | |
| content = validate_image(await image.read()) | |
| return assess_image_quality(content) | |
| async def enhance_preview(request: Request, image: UploadFile = File(...)): | |
| content = validate_image(await image.read()) | |
| quality = assess_image_quality(content) | |
| if not quality["is_blurry"]: | |
| return JSONResponse({"deblurred":False,"message":"Image already clear.","quality":quality}) | |
| enhanced, method_log = deblur_and_enhance(content, quality["blur_severity"]) | |
| return JSONResponse({"deblurred":True,"image_b64":image_to_b64(enhanced), | |
| "method_log":method_log,"quality_before":quality}) | |
| async def perform_ocr(request: Request, image: UploadFile = File(...), language: str = Form("en")): | |
| content = validate_image(await image.read()) | |
| return run_ocr(content, language) | |
| async def analyze_product( | |
| request: Request, persona: str = Form(...), | |
| age_group: str = Form("adult"), product_category: str = Form("general"), | |
| language: str = Form("en"), extracted_text: str = Form(None), | |
| image: UploadFile = File(...), | |
| ): | |
| if not GROQ_API_KEY: | |
| return JSONResponse({"error":"Server error: GROQ_API_KEY not set in Secrets"}) | |
| user = _get_request_user(request) | |
| device_key = _device_key(request) | |
| scan_check = _check_scan_quota(user, device_key) | |
| if not scan_check["allowed"]: | |
| return JSONResponse(status_code=402, content={ | |
| "error":"scan_limit_reached", | |
| "message":f"You've used all {FREE_SCAN_LIMIT} free scans this month.", | |
| "upgrade_url":"/payments/create-order"}) | |
| try: | |
| content = validate_image(await image.read()) | |
| quality = assess_image_quality(content) | |
| blur_info = {"detected":quality["is_blurry"],"severity":quality["blur_severity"], | |
| "score":quality["blur_score"],"deblurred":False, | |
| "method_log":None,"image_b64":None,"ocr_source":"original"} | |
| working = content | |
| if quality["is_blurry"]: | |
| try: | |
| enhanced, method_log = deblur_and_enhance(content, quality["blur_severity"]) | |
| if ocr_quality_score(run_ocr(enhanced,language)) >= ocr_quality_score(run_ocr(content,language))*0.85: | |
| working=enhanced; blur_info["deblurred"]=True | |
| blur_info["method_log"]=method_log; blur_info["image_b64"]=image_to_b64(enhanced) | |
| blur_info["ocr_source"]="deblurred"; extracted_text=None | |
| except Exception as exc: logger.warning("Deblur: %s", exc) | |
| if not extracted_text: | |
| ocr_result=run_ocr(working,language); extracted_text=ocr_result["text"]; ocr_wc=ocr_result["word_count"] | |
| else: ocr_wc=len(extracted_text.split()) | |
| if not extracted_text or ocr_wc==0: | |
| return JSONResponse({"error":"no_text","message":"No text found. Make sure the label is facing the camera.","tip":"flip_product"}) | |
| label_check = detect_label_presence(extracted_text) | |
| # Only block if truly no text at all β let LLM handle ambiguous cases | |
| # The LLM is smarter than keyword matching for real product labels | |
| if not label_check["has_label"] and label_check.get("suggestion") == "no_text": | |
| return JSONResponse({"error":"no_text", | |
| "message":"No text found in image. Make sure the label is facing the camera.", | |
| "tip":"flip_product"}) | |
| # For everything else β even uncertain detections β proceed to LLM analysis | |
| allergen_warning="" | |
| try: | |
| with db_conn() as conn: | |
| row=conn.execute("SELECT allergens,conditions FROM allergen_profiles WHERE device_key=?",(device_key,)).fetchone() | |
| if row: | |
| tl=extracted_text.lower() | |
| triggered=[a for a in json.loads(row["allergens"] or "[]") if a.lower() in tl]+\ | |
| [c for c in json.loads(row["conditions"] or "[]") if c.lower() in tl] | |
| if triggered: allergen_warning=f"β οΈ ALLERGEN ALERT β may contain: {', '.join(triggered)}" | |
| except Exception: pass | |
| web_context = await asyncio.to_thread(_get_live_search, f"health analysis ingredients {extracted_text[:120]}") | |
| result = await analyse_label(extracted_text, persona, age_group, product_category, | |
| language, web_context, blur_info, label_check.get("confidence","medium")) | |
| result["allergen_warning"]=allergen_warning; result["blur_info"]=blur_info; result["scan_meta"]=scan_check | |
| today=datetime.date.today().isoformat() | |
| nutr={n["name"].lower():float(n.get("value",0)) for n in result.get("nutrient_breakdown",[]) if isinstance(n.get("value"),(int,float))} | |
| cal=nutr.get("energy",nutr.get("calories",nutr.get("calorie",0))); prot=nutr.get("protein",0) | |
| carb=nutr.get("carbohydrate",nutr.get("carbs",0)); fat=nutr.get("fat",0) | |
| sod=nutr.get("sodium",0); fib=nutr.get("fiber",nutr.get("fibre",0)); sug=nutr.get("sugar",nutr.get("sugars",0)) | |
| owner_id=user["id"] if user else None | |
| with db_conn() as conn: | |
| conn.execute("INSERT INTO daily_logs(user_id,device_key,log_date,meal_name,calories,protein,carbs,fat,sodium,fiber,sugar,source) VALUES(?,?,?,?,?,?,?,?,?,?,?,?)", | |
| (owner_id,device_key,today,result.get("product_name","Scanned item"),cal,prot,carb,fat,sod,fib,sug,"scan")) | |
| conn.execute("INSERT INTO scans(user_id,device_key,product_name,score,verdict,calories,protein,carbs,fat,sodium,fiber,sugar,persona,language,analysis_json) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)", | |
| (owner_id,device_key,result.get("product_name","Unknown"),result.get("score",0),result.get("verdict",""), | |
| cal,prot,carb,fat,sod,fib,sug,persona,language, | |
| json.dumps({k:v for k,v in result.items() if k not in ("blur_info","scan_meta","allergen_warning")}))) | |
| try: upsert_food_product(name=result.get("product_name",""), nutrients=result.get("nutrient_breakdown",[]), score=result.get("score",0), ingredients_raw=extracted_text, category=result.get("product_category",""), source="llm_scan") | |
| except Exception: pass | |
| if user: _update_streak_user(user["id"]) | |
| else: | |
| try: | |
| _ensure_device(device_key); td=datetime.date.today().isoformat() | |
| yd=(datetime.date.today()-datetime.timedelta(days=1)).isoformat() | |
| with db_conn() as conn: | |
| row=conn.execute("SELECT streak_days,last_scan_date FROM devices WHERE device_key=?",(device_key,)).fetchone() | |
| if row and row["last_scan_date"]!=td: | |
| st=(row["streak_days"]+1) if row["last_scan_date"]==yd else 1 | |
| conn.execute("UPDATE devices SET streak_days=?,last_scan_date=? WHERE device_key=?",(st,td,device_key)) | |
| except Exception: pass | |
| return JSONResponse(result) | |
| except ValueError as exc: return JSONResponse({"error":str(exc)}, status_code=400) | |
| except Exception as exc: | |
| logger.error("Analysis error: %s", exc, exc_info=True) | |
| return JSONResponse({"error":f"Scan failed: {str(exc)[:140]}. Please try again."}) | |
| async def scan_status(request: Request): | |
| user=_get_request_user(request); device_key=_device_key(request) | |
| _ensure_device(device_key); month_key=datetime.date.today().isoformat()[:7] | |
| if user: | |
| with db_conn() as conn: | |
| row=conn.execute("SELECT is_pro,scan_month,scan_count_month,streak_days FROM users WHERE id=?",(user["id"],)).fetchone() | |
| if not row or row["scan_month"]!=month_key: | |
| return {"scans_used":0,"scans_remaining":FREE_SCAN_LIMIT,"is_pro":False,"limit":FREE_SCAN_LIMIT,"streak":0,"authenticated":True} | |
| used=row["scan_count_month"] | |
| return {"scans_used":used,"scans_remaining":9999 if row["is_pro"] else max(0,FREE_SCAN_LIMIT-used), | |
| "is_pro":bool(row["is_pro"]),"limit":FREE_SCAN_LIMIT,"streak":row["streak_days"],"authenticated":True} | |
| with db_conn() as conn: | |
| row=conn.execute("SELECT is_pro,month,scan_count,streak_days FROM devices WHERE device_key=?",(device_key,)).fetchone() | |
| if not row or row["month"]!=month_key: | |
| return {"scans_used":0,"scans_remaining":FREE_SCAN_LIMIT,"is_pro":False,"limit":FREE_SCAN_LIMIT,"streak":0,"authenticated":False} | |
| used=row["scan_count"] | |
| return {"scans_used":used,"scans_remaining":9999 if row["is_pro"] else max(0,FREE_SCAN_LIMIT-used), | |
| "is_pro":bool(row["is_pro"]),"limit":FREE_SCAN_LIMIT,"streak":row["streak_days"],"authenticated":False} | |
| # ββ Auth routes ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def request_otp(email: str = Form(...)): | |
| otp = _send_otp(email) | |
| return JSONResponse({"sent":True,"message":"OTP sent.","_dev_otp":otp}) | |
| async def verify_otp(request: Request, email: str = Form(...), otp: str = Form(...)): | |
| user = _verify_otp(email, otp) | |
| if not user: raise HTTPException(status_code=401, detail="Invalid or expired OTP") | |
| token = _create_session(user["id"], request.headers.get("user-agent","")[:100]) | |
| return JSONResponse({"token":token,"user_id":user["id"],"email":user.get("email",""),"is_pro":bool(user.get("is_pro",0))}) | |
| async def logout(request: Request): | |
| auth=request.headers.get("Authorization","") | |
| token=auth.removeprefix("Bearer ").strip() if auth.startswith("Bearer ") else None | |
| if token: | |
| with db_conn() as conn: conn.execute("DELETE FROM sessions WHERE token=?",(token,)) | |
| return JSONResponse({"logged_out":True}) | |
| async def get_me(request: Request): | |
| user=_get_request_user(request) | |
| if not user: raise HTTPException(status_code=401, detail="Not authenticated") | |
| return JSONResponse({"user_id":user["id"],"email":user.get("email",""),"name":user.get("name",""), | |
| "is_pro":bool(user.get("is_pro",0)),"streak_days":user.get("streak_days",0), | |
| "persona":user.get("persona","General Adult"),"language":user.get("language","en")}) | |
| # ββ Payment routes βββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def create_order(request: Request): | |
| user=_get_request_user(request) | |
| if not user: raise HTTPException(status_code=401, detail="Login required. POST /auth/request-otp first.") | |
| device_key=_device_key(request) | |
| try: | |
| return JSONResponse(_create_razorpay_order(user["id"], device_key)) | |
| except RuntimeError as exc: raise HTTPException(status_code=503, detail=str(exc)) | |
| async def verify_payment(request: Request, razorpay_order_id: str = Form(...), | |
| razorpay_payment_id: str = Form(...), razorpay_signature: str = Form(...)): | |
| try: | |
| return JSONResponse(_activate_pro_payment(razorpay_order_id, razorpay_payment_id, razorpay_signature)) | |
| except ValueError as exc: raise HTTPException(status_code=400, detail=str(exc)) | |
| async def activate_pro_legacy(request: Request, payment_id: str = Form(...)): | |
| """Legacy endpoint β kept for backward compatibility.""" | |
| device_key=_device_key(request); _ensure_device(device_key) | |
| with db_conn() as conn: conn.execute("UPDATE devices SET is_pro=1 WHERE device_key=?",(device_key,)) | |
| return {"status":"activated","message":"Pro activated. Use /payments/create-order for real billing."} | |
| # ββ Food DB routes βββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def food_search(request: Request, q: str = ""): | |
| if not q or len(q.strip())<2: return {"products":[],"source":"none"} | |
| with db_conn() as conn: | |
| rows=conn.execute("SELECT name,brand,category,calories_100g,protein_100g,carbs_100g,fat_100g,sodium_100g,fiber_100g,sugar_100g,eatlytic_score,verified FROM food_products WHERE (name LIKE ? OR brand LIKE ?) AND verified=1 ORDER BY scan_count DESC LIMIT 10",(f"%{q}%",f"%{q}%")).fetchall() | |
| if rows: return {"products":[dict(r) for r in rows],"source":"eatlytic_db"} | |
| try: | |
| import httpx | |
| async with httpx.AsyncClient(timeout=8) as hc: | |
| resp=await hc.get("https://world.openfoodfacts.org/cgi/search.pl", | |
| params={"search_terms":q,"action":"process","json":1,"page_size":10, | |
| "fields":"product_name,brands,nutriments"}) | |
| products=[] | |
| for p in resp.json().get("products",[]): | |
| n=p.get("nutriments",{}) | |
| products.append({"name":p.get("product_name",""),"brand":p.get("brands",""), | |
| "calories_100g":round(n.get("energy-kcal_100g",0),1), | |
| "protein_100g":round(n.get("proteins_100g",0),1), | |
| "carbs_100g":round(n.get("carbohydrates_100g",0),1), | |
| "fat_100g":round(n.get("fat_100g",0),1), | |
| "sodium_100g":round(n.get("sodium_100g",0)*1000,1), | |
| "fiber_100g":round(n.get("fiber_100g",0),1), | |
| "sugar_100g":round(n.get("sugars_100g",0),1), | |
| "eatlytic_score":0,"verified":0,"source":"openfoodfacts"}) | |
| return {"products":products,"source":"openfoodfacts"} | |
| except Exception as exc: | |
| logger.warning("Food search: %s", exc); return {"products":[],"source":"unavailable"} | |
| async def food_db_stats(): | |
| with db_conn() as conn: | |
| total =conn.execute("SELECT COUNT(*) FROM food_products").fetchone()[0] | |
| verified=conn.execute("SELECT COUNT(*) FROM food_products WHERE verified=1").fetchone()[0] | |
| return {"total_products":total,"verified_products":verified, | |
| "moat_status": "π΄ Early (<1K)" if total<1000 else "π‘ Growing (1K-10K)" if total<10000 else "π’ Defensible (10K+)"} | |
| # ββ Daily tracker routes βββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def daily_summary(request: Request, date: str = None): | |
| user=_get_request_user(request); device_key=_device_key(request) | |
| target_date=date or datetime.date.today().isoformat(); user_id=user["id"] if user else None | |
| with db_conn() as conn: | |
| dev=conn.execute("SELECT tdee FROM users WHERE id=?" if user_id else "SELECT tdee FROM devices WHERE device_key=?",(user_id or device_key,)).fetchone() | |
| clause="user_id=?" if user_id else "device_key=?"; param=user_id or device_key | |
| row=conn.execute(f"SELECT SUM(calories) cal, SUM(protein) prot, SUM(carbs) carb, SUM(fat) fat, SUM(sodium) sod, SUM(fiber) fib, SUM(sugar) sug, COUNT(*) items FROM daily_logs WHERE {clause} AND log_date=?",(param,target_date)).fetchone() | |
| log_items=conn.execute(f"SELECT id,meal_name,calories,protein,carbs,fat,sodium,source,logged_at FROM daily_logs WHERE {clause} AND log_date=? ORDER BY logged_at DESC",(param,target_date)).fetchall() | |
| tdee=float((dev and dev["tdee"]) or 2000) or 2000 | |
| totals={k:round(row[k] or 0,1) for k in ("cal","prot","carb","fat","sod","fib","sug")} | |
| t={"calories":round(tdee),"protein":56,"carbs":round(tdee*.5/4),"fat":round(tdee*.3/9),"sodium":2300,"fiber":28,"sugar":50} | |
| cal_left=max(0,t["calories"]-totals["cal"]); prot_left=max(0,t["protein"]-totals["prot"]) | |
| suggestion="" | |
| if cal_left<200: suggestion="π― Almost at your calorie target!" | |
| elif prot_left>20: suggestion=f"πͺ {round(prot_left)}g protein left. Try: eggs, dal, paneer." | |
| elif cal_left>600: suggestion=f"π½ {round(cal_left)} kcal remaining." | |
| return {"date":target_date,"totals":totals,"targets":t,"suggestion":suggestion, | |
| "items":row["items"] or 0,"log":[dict(r) for r in log_items]} | |
| async def daily_log(request: Request, meal_name: str=Form(...), calories: float=Form(0), | |
| protein: float=Form(0), carbs: float=Form(0), fat: float=Form(0), | |
| sodium: float=Form(0), fiber: float=Form(0), sugar: float=Form(0), | |
| source: str=Form("manual"), log_date: str=Form(None)): | |
| user=_get_request_user(request); device_key=_device_key(request) | |
| target_date=log_date or datetime.date.today().isoformat(); user_id=user["id"] if user else None | |
| _ensure_device(device_key) | |
| with db_conn() as conn: | |
| conn.execute("INSERT INTO daily_logs(user_id,device_key,log_date,meal_name,calories,protein,carbs,fat,sodium,fiber,sugar,source) VALUES(?,?,?,?,?,?,?,?,?,?,?,?)", | |
| (user_id,device_key,target_date,meal_name,calories,protein,carbs,fat,sodium,fiber,sugar,source)) | |
| return {"status":"logged","date":target_date,"meal":meal_name} | |
| async def delete_log(request: Request, log_id: int): | |
| device_key=_device_key(request) | |
| with db_conn() as conn: conn.execute("DELETE FROM daily_logs WHERE id=? AND device_key=?",(log_id,device_key)) | |
| return {"status":"deleted","id":log_id} | |
| # ββ Profile routes βββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def onboarding_complete(request: Request, persona: str=Form("General Adult"), | |
| language: str=Form("en"), tdee: float=Form(0), allergens: str=Form("[]")): | |
| user=_get_request_user(request); device_key=_device_key(request) | |
| _ensure_device(device_key); user_id=user["id"] if user else None | |
| with db_conn() as conn: | |
| conn.execute("UPDATE devices SET onboarding_done=1,persona=?,language=?,tdee=? WHERE device_key=?",(persona,language,tdee,device_key)) | |
| if user_id: conn.execute("UPDATE users SET onboarding_done=1,persona=?,language=?,tdee=? WHERE id=?",(persona,language,tdee,user_id)) | |
| conn.execute("INSERT OR REPLACE INTO allergen_profiles(device_key,user_id,allergens) VALUES(?,?,?)",(device_key,user_id,allergens)) | |
| return {"status":"ok"} | |
| async def get_allergen_profile(request: Request): | |
| device_key=_device_key(request) | |
| with db_conn() as conn: | |
| row=conn.execute("SELECT allergens,conditions FROM allergen_profiles WHERE device_key=?",(device_key,)).fetchone() | |
| if not row: return {"allergens":[],"conditions":[]} | |
| return {"allergens":json.loads(row["allergens"] or "[]"),"conditions":json.loads(row["conditions"] or "[]")} | |
| async def set_allergen_profile(request: Request, allergens: str=Form("[]"), conditions: str=Form("[]")): | |
| device_key=_device_key(request); _ensure_device(device_key) | |
| with db_conn() as conn: | |
| conn.execute("INSERT OR REPLACE INTO allergen_profiles(device_key,allergens,conditions,updated_at) VALUES(?,?,?,datetime('now'))",(device_key,allergens,conditions)) | |
| return {"status":"saved"} | |
| # ββ Admin routes βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def admin_analytics(request: Request): | |
| token=request.headers.get("X-Admin-Token","") | |
| if token != ADMIN_TOKEN: raise HTTPException(status_code=403, detail="Invalid token") | |
| today=datetime.date.today().isoformat(); mkey=today[:7] | |
| with db_conn() as conn: | |
| dau =conn.execute("SELECT COUNT(DISTINCT COALESCE(user_id,device_key)) FROM scans WHERE DATE(scanned_at)=?",(today,)).fetchone()[0] | |
| mau =conn.execute("SELECT COUNT(DISTINCT COALESCE(user_id,device_key)) FROM scans WHERE strftime('%Y-%m',scanned_at)=?",(mkey,)).fetchone()[0] | |
| tot =conn.execute("SELECT COUNT(*) FROM scans").fetchone()[0] | |
| avgs=conn.execute("SELECT AVG(score) FROM scans").fetchone()[0] | |
| users=conn.execute("SELECT COUNT(*) FROM users").fetchone()[0] | |
| food_ct=conn.execute("SELECT COUNT(*) FROM food_products").fetchone()[0] | |
| verified=conn.execute("SELECT COUNT(*) FROM food_products WHERE verified=1").fetchone()[0] | |
| top=conn.execute("SELECT product_name,COUNT(*) c FROM scans GROUP BY product_name ORDER BY c DESC LIMIT 10").fetchall() | |
| return {"dau":dau,"mau":mau,"total_scans":tot,"total_users":users, | |
| "avg_score":round(avgs or 0,2),"dau_mau":round(dau/mau*100,1) if mau else 0, | |
| "food_db":{"total":food_ct,"verified":verified}, | |
| "top_products":[{"name":r[0],"scans":r[1]} for r in top]} | |
| async def create_api_key_endpoint(request: Request, client_name: str=Form(...), plan: str=Form("business")): | |
| token=request.headers.get("X-Admin-Token","") | |
| if token != ADMIN_TOKEN: raise HTTPException(status_code=403, detail="Invalid admin token") | |
| key="eak_"+secrets.token_urlsafe(32) | |
| with db_conn() as conn: conn.execute("INSERT INTO api_keys(api_key,client_name,plan) VALUES(?,?,?)",(key,client_name,plan)) | |
| return {"api_key":key,"client":client_name,"plan":plan} | |
| # ββ Misc routes ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def submit_nps(request: Request, score: int=Form(...), comment: str=Form("")): | |
| if not 0<=score<=10: return JSONResponse({"error":"Score must be 0-10"},status_code=400) | |
| user=_get_request_user(request); device_key=_device_key(request) | |
| with db_conn() as conn: | |
| conn.execute("INSERT INTO nps_responses(device_key,user_id,score,comment) VALUES(?,?,?,?)", | |
| (device_key,user["id"] if user else None,score,comment[:500])) | |
| return {"status":"thank_you"} | |
| async def generate_share_card(request: Request, product_name: str=Form(...), score: int=Form(...), | |
| verdict: str=Form(...), top_warning: str=Form(""), top_pro: str=Form("")): | |
| W,H=1080,1080; img=Image.new("RGB",(W,H),(15,17,23)); draw=ImageDraw.Draw(img) | |
| font=ImageFont.load_default() | |
| s_rgb=(34,197,94) if score>=7 else (245,158,11) if score>=4 else (239,68,68) | |
| def centered(text,y,fill): | |
| try: tw=font.getbbox(text)[2]-font.getbbox(text)[0] | |
| except: tw=len(text)*6 | |
| draw.text(((W-tw)//2,y),text,fill=fill,font=font) | |
| draw.ellipse([340,160,740,560],outline=s_rgb,width=18) | |
| centered(str(score),340,s_rgb); centered("/10",430,(100,116,139)) | |
| centered(product_name[:38]+("β¦" if len(product_name)>38 else ""),600,(255,255,255)) | |
| centered(verdict[:50],650,(148,163,184)) | |
| if top_pro: draw.rectangle([60,700,1020,760],fill=(15,60,40)); centered(f"β {top_pro[:65]}",718,(74,222,128)) | |
| if top_warning: draw.rectangle([60,775,1020,840],fill=(124,29,29)); centered(f"β {top_warning[:65]}",795,(252,165,165)) | |
| centered("eatlytic.com β’ scan any food label, no barcode needed",1000,(71,85,105)) | |
| buf=BytesIO(); img.save(buf,format="PNG",optimize=True); buf.seek(0) | |
| return Response(content=buf.getvalue(),media_type="image/png", | |
| headers={"Content-Disposition":"attachment; filename=eatlytic-scan.png"}) | |
| async def export_pdf(request: Request, analysis_json: str=Form(...)): | |
| try: data=json.loads(analysis_json) | |
| except Exception: return JSONResponse({"error":"Invalid JSON"},status_code=400) | |
| try: | |
| from reportlab.lib.pagesizes import A4 | |
| from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle | |
| from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle | |
| from reportlab.lib import colors as rl; from reportlab.lib.units import cm | |
| except ImportError: return JSONResponse({"error":"reportlab not installed"},status_code=501) | |
| buf=BytesIO(); doc=SimpleDocTemplate(buf,pagesize=A4,rightMargin=2*cm,leftMargin=2*cm,topMargin=2*cm,bottomMargin=2*cm) | |
| stys=getSampleStyleSheet(); story=[] | |
| story.append(Paragraph("Eatlytic Food Label Analysis",stys["Title"])) | |
| story.append(Paragraph(f"Product: {data.get('product_name','Unknown')}",stys["Heading2"])) | |
| story.append(Paragraph(MEDICAL_DISCLAIMER,ParagraphStyle("d",parent=stys["Normal"],fontSize=8,textColor=rl.grey))) | |
| story.append(Spacer(1,.4*cm)) | |
| score=data.get("score",0); sc="22c55e" if score>=7 else "f59e0b" if score>=4 else "ef4444" | |
| story.append(Paragraph(f"<font color='#{sc}'>Health Score: {score}/10 β {data.get('verdict','')}</font>",stys["Heading1"])) | |
| if data.get("summary"): story.append(Paragraph("Summary",stys["Heading2"])); story.append(Paragraph(data["summary"],stys["Normal"])) | |
| nutrients=data.get("nutrient_breakdown",[]) | |
| if nutrients: | |
| story.append(Paragraph("Nutrient Breakdown",stys["Heading2"])) | |
| td=[["Nutrient","Amount","Rating"]]+[[str(n.get("name","")),f"{n.get('value','')} {n.get('unit','')}".strip(),str(n.get("rating","")).upper()] for n in nutrients] | |
| tbl=Table(td,colWidths=[6*cm,4*cm,4*cm]) | |
| tbl.setStyle(TableStyle([("BACKGROUND",(0,0),(-1,0),rl.HexColor("1D9E75")),("TEXTCOLOR",(0,0),(-1,0),rl.white),("FONTSIZE",(0,0),(-1,-1),10),("BACKGROUND",(0,1),(-1,-1),rl.HexColor("f8faf8")),("GRID",(0,0),(-1,-1),.4,rl.HexColor("d0d8d4")),("TOPPADDING",(0,0),(-1,-1),6),("BOTTOMPADDING",(0,0),(-1,-1),6),("LEFTPADDING",(0,0),(-1,-1),8),("RIGHTPADDING",(0,0),(-1,-1),8)])) | |
| story.append(tbl) | |
| if data.get("pros"): story.append(Paragraph("Benefits",stys["Heading2"])); [story.append(Paragraph(f"β {p}",stys["Normal"])) for p in data["pros"]] | |
| if data.get("cons"): story.append(Paragraph("Concerns",stys["Heading2"])); [story.append(Paragraph(f"β {c}",stys["Normal"])) for c in data["cons"]] | |
| try: doc.build(story) | |
| except Exception as exc: return JSONResponse({"error":f"PDF failed: {exc}"},status_code=500) | |
| buf.seek(0); safe=data.get("product_name","scan").replace(" ","-")[:40] | |
| return Response(content=buf.getvalue(),media_type="application/pdf", | |
| headers={"Content-Disposition":f"attachment; filename=eatlytic-{safe}.pdf"}) | |