from fastapi import FastAPI, UploadFile, File, Form, Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer from PIL import Image from sqlalchemy.orm import Session from sqlalchemy import func from fastapi. middleware.cors import CORSMiddleware from datetime import datetime,timedelta from pydantic import BaseModel from typing import List from datetime import date as date_type,timedelta import os import uuid import io from app.core.config import settings from app.core.email import send_reset_code,send_support_email from app.core.utils import validate_email,validate_password,generate_code from app.core.security import decode_access_token, hash_password, verify_password, create_access_token from app.db.models import User, History,PasswordReset,WeightLog from app.db.database import Base, engine, SessionLocal from app.services.nutrition_estimator import NutritionEstimator from app.core.totp import ( generate_totp_secret, generate_qr_code, verify_totp_code, get_current_totp_code ) Base.metadata.create_all(bind=engine) def get_db(): db = SessionLocal() try: yield db finally: db.close() oauth2_scheme = OAuth2PasswordBearer(tokenUrl="login") def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)): payload = decode_access_token(token) if not payload: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid authentication credentials", ) if payload.get("temp"): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Temporary token not allowed. Please complete 2FA verification.", headers={"WWW-Authenticate": "Bearer"}, ) username = payload.get("sub") if not username: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid authentication credentials", ) user = db.query(User).filter(User.username == username).first() if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found", ) return user app = FastAPI() cors_origins = os.getenv("CORS_ORIGINS", "http://localhost:5173").split(",") app.add_middleware( CORSMiddleware, allow_origins=cors_origins, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) try: nutrition_estimator=NutritionEstimator() except Exception as e: print(f"Warning: Nutrition Estimator failed to initialize: {e}") nutrition_estimator=None @app.get("/") def root(): return {"message": "Food AI API is running"} @app.post("/register") def register(username:str=Form(...),password:str=Form(...),email:str=Form(...),db:Session=Depends(get_db)): if not validate_email(email): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid email format." ) is_valid, error_message = validate_password(password) if not is_valid: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=error_message ) existing_user=db.query(User).filter(User.username==username).first() if existing_user: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Username already registered." ) existing_email=db.query(User).filter(User.email==email).first() if existing_email: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Email already registered." ) hashed_password=hash_password(password) user=User(username=username,email=email,hashed_password=hashed_password) db.add(user) db.commit() db.refresh(user) return {"message": "User created successfully"} @app.post("/login") def login(username:str=Form(...),password:str=Form(...),db:Session=Depends(get_db)): user=db.query(User).filter(User.username==username).first() if not user: user=db.query(User).filter(User.email==username).first() if not user or not verify_password(password,user.hashed_password): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username/email or password" ) if user.is_2fa_enabled: temp_token=create_access_token( data={"sub":user.username,"temp":True}, expires_delta=timedelta(minutes=5) ) return { "requires_2fa":True, "temp_token":temp_token, "message":"Please provide 2FA code" } access_token=create_access_token(data={"sub":user.username}) return {"access_token":access_token,"token_type":"bearer"} @app.post("/login/2fa") def login_2fa(temp_token:str=Form(...),code:str=Form(...),db:Session=Depends(get_db)): payload=decode_access_token(temp_token) if not payload or not payload.get("temp"): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or expired temporary token" ) username=payload.get("sub") if not username: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token" ) user=db.query(User).filter(User.username==username).first() if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found" ) if not user.is_2fa_enabled: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="2FA is not enabled for this account" ) if not verify_totp_code(user.totp_secret,code): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid 2FA code" ) access_token=create_access_token(data={"sub":user.username}) return {"access_token": access_token, "token_type": "bearer"} @app.post("/forgot-password") def forgot_password(email:str=Form(...),db:Session=Depends(get_db)): if not validate_email(email): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid email format." ) user=db.query(User).filter(User.email==email).first() if not user: return { "message": "If this email exists, a reset code has been sent.", "email": email } reset_code=generate_code() expires_at=datetime.utcnow()+timedelta(minutes=settings.RESET_CODE_EXPIRE_MINUTES) db.query(PasswordReset).filter(PasswordReset.user_id==user.id,PasswordReset.is_used==0).delete() password_reset=PasswordReset(user_id=user.id,reset_code=reset_code,expires_at=expires_at,is_used=0) db.add(password_reset) db.commit() email_sent=send_reset_code(user.email,reset_code) if not email_sent: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to send email. Please try again later." ) return { "message": "Reset code sent to your email.", "email": email, "expires_in_minutes": settings.RESET_CODE_EXPIRE_MINUTES } @app.post("/reset-password") def reset_password(email:str=Form(...),reset_code:str=Form(...),new_password:str=Form(...),db:Session=Depends(get_db)): user=db.query(User).filter(User.email==email).first() if not user: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid email or reset code." ) reset_entry=db.query(PasswordReset).filter(PasswordReset.user_id==user.id,PasswordReset.reset_code==reset_code,PasswordReset.is_used==0).first() if not reset_entry: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid or expired reset code." ) if datetime.utcnow() > reset_entry.expires_at: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Reset code has expired. Please request a new one." ) is_valid, error_message = validate_password(new_password) if not is_valid: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=error_message ) user.hashed_password=hash_password(new_password) reset_entry.is_used = 1 db.commit() return { "message": "Password reset successfully.", "email": email } @app.post("/2fa/enable") def enable_2fa(db:Session=Depends(get_db),current_user:User=Depends(get_current_user)): if current_user.is_2fa_enabled: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="2FA is already enabled for this account." ) secret=generate_totp_secret() current_user.totp_secret=secret db.commit() qr_code=generate_qr_code(username=current_user.email,secret=secret,issuer="CalorieTrackerAi") return { "message": "Scan the QR code with Google Authenticator", "qr_code": qr_code, "instructions": [ "1. Install Google Authenticator on your phone", "2. Open the QR code URL in browser", "3. Scan the QR code with Google Authenticator", "4. Enter the 6-digit code to confirm" ] } @app.post("/2fa/disable") def disable_2fa(db:Session=Depends(get_db),current_user:User=Depends(get_current_user),code:str=Form(...)): if not current_user.is_2fa_enabled: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="2FA is not enabled for this account." ) if not verify_totp_code(current_user.totp_secret, code): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid verification code." ) current_user.is_2fa_enabled=False current_user.totp_secret=None db.commit() return { "message": "2FA disabled successfully.", "email": current_user.email } @app.post("/2fa/verify") def verify_2fa(db:Session=Depends(get_db),code:str=Form(...),current_user:User=Depends(get_current_user)): if not current_user.totp_secret: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="2FA setup not initiated. Call /2fa/enable first." ) if current_user.is_2fa_enabled: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="2FA is already enabled." ) if not verify_totp_code(current_user.totp_secret, code): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid verification code." ) current_user.is_2fa_enabled=True db.commit() return { "message": "2FA enabled successfully! ", "email": current_user.email } @app.post("/predict") async def predict(file: UploadFile=File(...),db:Session=Depends(get_db),current_user:User=Depends(get_current_user)): try: if nutrition_estimator is None: raise HTTPException( status_code=500, detail="Nutrition analysis not available" ) contents = await file.read() allowed_extensions = {'png','jpg','jpeg','gif','bmp'} file_ext = file.filename.rsplit('.', 1)[1].lower() if '.' in file.filename else '' if file_ext not in allowed_extensions: raise HTTPException( status_code=400, detail=f"Invalid file type.Allowed: {', '.join(allowed_extensions)}" ) upload_folder='uploads' os.makedirs(upload_folder,exist_ok=True) unique_filename=f"{uuid.uuid4()}_{file.filename}" temp_path=os.path.join(upload_folder,unique_filename) with open(temp_path,'wb') as f: f.write(contents) try: from starlette.concurrency import run_in_threadpool result = await run_in_threadpool(nutrition_estimator.analyze_image, temp_path) if result is None: raise HTTPException( status_code=500, detail="Analysis returned no result (internal error)" ) try: os.remove(temp_path) except: pass if not result['success']: raise HTTPException( status_code=500, detail=f"Analysis failed: {result.get('error','Unknown error')}" ) final_result=result["final_result"] final_foods=final_result["final_foods"] foods_list=[] for food_item in final_foods: food_name=food_item['name'] nutrition=food_item.get('nutrition',{}) if nutrition is None: nutrition={} estimated_grams=nutrition.get('estimated_weight_grams',0) or 0 estimated_calories=nutrition.get('calories_total',0) or 0 macros=nutrition.get('macronutrients',{}) if macros is None: macros={} protein_g=macros.get('protein_g',0.0) or 0.0 carbs_g=macros.get('carbs_g',0.0) or 0.0 fat_g=macros.get('fat_g',0.0) or 0.0 foods_list.append({"name":food_name, "grams":estimated_grams, "calories":estimated_calories, "protein_g":protein_g, "carbs_g":carbs_g, "fat_g":fat_g}) return { "success":True, "foods_detected":foods_list, "total_nutrition":final_result['total_nutrition'], "reconciliation_info":final_result['reconciliation_summary'] } except HTTPException: raise except Exception as e: try: os.remove(temp_path) except: pass raise HTTPException( status_code=500, detail=f"Analysis failed: {str(e)}" ) except HTTPException: raise except Exception as e: raise HTTPException( status_code=500, detail=f"Server error: {str(e)}" ) class FoodItemInput(BaseModel): name:str grams:int calories:float protein_g:float carbs_g:float fat_g:float class SaveMealRequest(BaseModel): foods:List[FoodItemInput] meal_type:str meal_time:str log_date:str @app.post("/history/save") def save_meal(request:SaveMealRequest,db:Session=Depends(get_db),current_user:User=Depends(get_current_user)): try: valid_meal_types=["Breakfast","Lunch","Snack","Dinner"] if request.meal_type not in valid_meal_types: raise HTTPException( status_code=400, detail=f"Invalid meal_type. Must be one of: {', '.join(valid_meal_types)}" ) try: log_date = datetime.fromisoformat(request.log_date) except ValueError: raise HTTPException( status_code=400, detail="Invalid log_date format. Use ISO format: YYYY-MM-DD" ) saved_items=[] for food in request.foods: history=History( label=food.name, grams=food.grams, calories=food.calories, protein_g=food.protein_g, carbs_g=food.carbs_g, fat_g=food.fat_g, meal_type=request.meal_type, meal_time=request.meal_time, log_date=log_date, user_id=current_user.id ) db.add(history) saved_items.append({ "name": food.name, "grams": food.grams, "calories": food.calories }) db.commit() return { "success": True, "message": f"Meal logged successfully ({len(saved_items)} items)", "meal_type": request.meal_type, "meal_time": request.meal_time, "log_date": request.log_date, "items_saved": len(saved_items), "foods": saved_items } except HTTPException: raise except Exception as e: db.rollback() raise HTTPException( status_code=500, detail=f"Failed to save meal: {str(e)}" ) @app.get("/history") def get_history(date:str=None,db:Session=Depends(get_db),current_user=Depends(get_current_user)): query=db.query(History).filter(History.user_id==current_user.id) if date: try: target_date=datetime.fromisoformat(date) query=query.filter(func.date(History.log_date)==target_date.date()) except ValueError: raise HTTPException( status_code=400, detail="Invalid date format. Use YYYY-MM-DD" ) history = query.order_by(History.created_at.desc()).all() return { "success": True, "count": len(history), "date_filter": date if date else "all", "history": history } class UpdateMealRequest(BaseModel): grams: int calories: float protein_g: float carbs_g: float fat_g: float @app.put("/history/item/{meal_id}") def update_meal(meal_id:int,request:UpdateMealRequest,db:Session=Depends(get_db),current_user: User = Depends(get_current_user)): meal=db.query(History).filter(History.id==meal_id,History.user_id==current_user.id).first() if not meal: raise HTTPException( status_code=404, detail="Meal not found" ) meal.grams = request.grams meal.calories = request.calories meal.protein_g = request.protein_g meal.carbs_g = request.carbs_g meal.fat_g = request.fat_g db.commit() db.refresh(meal) return { "success": True, "message": "Meal updated successfully", "id": meal.id } @app.delete("/history/item/{meal_id}") def update_meal(meal_id:int,db:Session=Depends(get_db),current_user: User = Depends(get_current_user)): meal = db.query(History).filter(History.id == meal_id, History.user_id == current_user.id).first() if not meal: raise HTTPException( status_code=404, detail="Meal not found" ) db.delete(meal) db.commit() return { "success":True, "message":"Meal deleted successfully", "id":meal_id } class UpdateProfileRequest(BaseModel): full_name:str|None=None age:int | None = None gender:str |None = None height_cm: float| None = None weight_kg:float| None = None activity_level:str | None = None username:str | None = None email:str|None = None goal_type:str|None = None calorie_goal_mode:str|None = None calorie_goal_manual:float|None = None weight_goal_kg: float |None = None @app.get("/profile") def get_profile(current_user:User=Depends(get_current_user)): return{"success":True, "id":current_user.id, "username":current_user.username, "email":current_user.email, "full_name":current_user.full_name, "age":current_user.age, "gender":current_user.gender, "height_cm":current_user.height_cm, "weight_kg":current_user.weight_kg, "activity_level":current_user.activity_level, "goal_type": current_user.goal_type, "calorie_goal_mode": current_user.calorie_goal_mode, "calorie_goal_manual": current_user.calorie_goal_manual, "weight_goal_kg": current_user.weight_goal_kg, "is_2fa_enabled":current_user.is_2fa_enabled, "setup_completed":current_user.setup_completed, } @app.put("/profile") def update_profile(request:UpdateProfileRequest,db:Session=Depends(get_db),current_user:User=Depends(get_current_user)): if request.username and request.username!=current_user.username: existing=db.query(User).filter(User.username==request.username).firt() if existing: raise HTTPException(status_code=400, detail="Username already taken.") current_user.username = request.username if request.email and request.email != current_user.email: if not validate_email(request.email): raise HTTPException(status_code=400, detail="Invalid email format.") existing = db.query(User).filter(User.email == request.email).first() if existing: raise HTTPException(status_code=400, detail="Email already taken.") current_user.email = request.email old_goal_type = current_user.goal_type if request.full_name is not None: current_user.full_name = request.full_name if request.age is not None: current_user.age = request.age if request.gender is not None: current_user.gender = request.gender if request.height_cm is not None: current_user.height_cm = request.height_cm if request.activity_level is not None: current_user.activity_level = request.activity_level if request.goal_type is not None: current_user.goal_type = request.goal_type if request.calorie_goal_mode is not None: current_user.calorie_goal_mode = request.calorie_goal_mode if request.calorie_goal_manual is not None: current_user.calorie_goal_manual = request.calorie_goal_manual if request.weight_goal_kg is not None: current_user.weight_goal_kg = request.weight_goal_kg if request.weight_kg is not None: current_user.weight_kg = request.weight_kg today = date_type.today() existing = db.query(WeightLog).filter( WeightLog.user_id == current_user.id, WeightLog.date == today ).first() if existing: existing.weight_kg = request.weight_kg else: db.add(WeightLog( user_id=current_user.id, date=today, weight_kg=request.weight_kg )) if request.goal_type is not None and request.goal_type != old_goal_type: current_user.start_weight = current_user.weight_kg today = date_type.today() existing = db.query(WeightLog).filter( WeightLog.user_id == current_user.id, WeightLog.date == today ).first() if existing: existing.weight_kg = current_user.weight_kg else: db.add(WeightLog(user_id=current_user.id, date=today, weight_kg=current_user.weight_kg)) db.commit() db.refresh(current_user) return { "success": True, "message": "Profile updated successfully", "username": current_user.username, "email": current_user.email, "full_name": current_user.full_name, "age": current_user.age, "gender": current_user.gender, "height_cm": current_user.height_cm, "weight_kg": current_user.weight_kg, "activity_level": current_user.activity_level, } @app.post("/support") def send_support_message(message:str=Form(...),current_user:User=Depends(get_current_user),db:Session=Depends(get_db)): if not message.strip(): raise HTTPException( status_code=400, detail="Message cannot be empty." ) email_sent=send_support_email(user_email=current_user.email,username=current_user.username,message=message) if not email_sent: raise HTTPException( status_code=500, detail="Failed to send message. Please try again later." ) return {"success": True, "message": "Message sent successfully!"} class SetupRequest(BaseModel): age:int gender:str weight_kg:float height_cm:float activity_level:str goal_type:str weight_goal_kg:float|None=None @app.post('/setup/complete') def complete_setup(request: SetupRequest,db: Session = Depends(get_db),current_user: User = Depends(get_current_user)): if request.gender not in ['male', 'female']: raise HTTPException(status_code=400, detail="Invalid gender.") if request.activity_level not in ['sedentary', 'light', 'moderate', 'active']: raise HTTPException(status_code=400, detail="Invalid activity level.") if request.goal_type not in ['lose', 'maintain', 'gain']: raise HTTPException(status_code=400, detail="Invalid goal type.") if request.goal_type in ['lose', 'gain'] and not request.weight_goal_kg: raise HTTPException(status_code=400, detail="Target weight is required for this goal.") current_user.age = request.age current_user.gender = request.gender current_user.weight_kg = request.weight_kg current_user.height_cm = request.height_cm current_user.activity_level = request.activity_level current_user.goal_type = request.goal_type current_user.weight_goal_kg = request.weight_goal_kg current_user.start_weight=request.weight_kg current_user.setup_completed = True first_log=WeightLog(user_id=current_user.id, weight_kg=request.weight_kg, date=date_type.today(),) db.add(first_log) db.commit() return{"success":True,"message":"Setup completed successfully!"} @app.post("/weight-log") def add_weight_log(weight_kg:float=Form(...),db:Session=Depends(get_db),current_user:User=Depends(get_current_user)): if weight_kg < 20 or weight_kg > 300: raise HTTPException(status_code=400, detail="Invalid weight value.") today = date_type.today() existing=db.query(WeightLog).filter(WeightLog.user_id==current_user.id,WeightLog.date==today).first() if existing: existing.weight_kg=weight_kg else: log=WeightLog(user_id=current_user.id, weight_kg=weight_kg, date=today, ) db.add(log) current_user.weight_kg=weight_kg db.commit() return {"success": True, "message": "Weight logged successfully!", "weight_kg": weight_kg} @app.get("/progress") def get_progress(db:Session=Depends(get_db),current_user:User=Depends(get_current_user)): def calculate_tdee(user): if not user.weight_kg or not user.height_cm or not user.age or not user.gender: return 2000 if user.gender == 'male': bmr = 10 * user.weight_kg + 6.25 * user.height_cm - 5 * user.age + 5 else: bmr = 10 * user.weight_kg + 6.25 * user.height_cm - 5 * user.age - 161 multipliers = { 'sedentary': 1.2, 'light': 1.375, 'moderate': 1.55, 'active': 1.725, } tdee = bmr * multipliers.get(user.activity_level, 1.2) if user.goal_type == 'lose': tdee -= 500 elif user.goal_type == 'gain': tdee += 300 return round(tdee) if current_user.calorie_goal_mode == 'manual' and current_user.calorie_goal_manual: calorie_goal = current_user.calorie_goal_manual else: calorie_goal = calculate_tdee(current_user) weight_logs = db.query(WeightLog).filter( WeightLog.user_id == current_user.id ).order_by(WeightLog.date.asc()).all() start_weight=current_user.start_weight or current_user.weight_kg or 0 current_weight = weight_logs[-1].weight_kg if weight_logs else (current_user.weight_kg or 0) goal_weight=current_user.weight_goal_kg goal_type=current_user.goal_type or 'maintain' if goal_type=='lose' and goal_weight: total=round(start_weight - goal_weight, 1) changed=round(start_weight - current_weight, 1) remaining=round(current_weight - goal_weight, 1) progress=round((changed / total * 100), 1) if total > 0 else 0 elif goal_type=='gain' and goal_weight: total=round(goal_weight - start_weight, 1) changed=round(current_weight - start_weight, 1) remaining=round(goal_weight - current_weight, 1) progress=round((changed / total * 100), 1) if total > 0 else 0 else: total=0 changed=0 remaining=0 progress=100 all_history=db.query(History).filter(History.user_id==current_user.id).order_by(History.log_date.asc()).all() distinct_days=len(set(h.log_date.date() for h in all_history if h.log_date)) today=date_type.today() streak=0 check_day=today logged_days=set(h.log_date.date() for h in all_history) while check_day in logged_days: streak+=1 check_day-=timedelta(days=1) if all_history: total_cals=sum(h.calories or 0 for h in all_history) avg_calories=round(total_cals / distinct_days) if distinct_days>0 else 0 else: avg_calories=0 week_ago=today-timedelta(days=7) week_history=[h for h in all_history if h.log_date and h.log_date.date()>=week_ago] weekly_calories=sum(h.calories or 0 for h in week_history) week_days=len(set(h.log_date.date() for h in week_history if h.log_date)) if week_days>0 and calorie_goal: avg_weekly_cal=weekly_calories/week_days daily_diff=avg_weekly_cal-calorie_goal weekly_change=round((daily_diff*7)/7700,2) else: weekly_change=0.0 weight_trend=[] for log in weight_logs: weight_trend.append({ "date": log.date.isoformat(), "label":log.date.strftime("%b %d"), "weight":float(log.weight_kg), }) if not weight_trend: weight_trend.append({ "label": today.strftime("%b %d"), "weight":float(current_weight), }) days_labels = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun'] calories_week = [] for i in range(7): day = today - timedelta(days=today.weekday() - i) day_history = [h for h in all_history if h.log_date and h.log_date.date() == day] day_cals = sum(h.calories or 0 for h in day_history) calories_week.append({ "label": days_labels[i], "calories": day_cals, }) return { "success": True, "start_weight": start_weight, "current_weight": current_weight, "goal_weight": goal_weight, "goal_type": goal_type, "changed_so_far": max(changed, 0), "remaining": max(remaining, 0), "progress_pct": min(max(progress, 0), 100), "days_tracking": distinct_days, "streak": streak, "avg_calories": avg_calories, "weekly_change": weekly_change, "calorie_goal": calorie_goal, "weight_trend": weight_trend, "calories_week": calories_week, }