Prathamesh Sable commited on
Commit
38c555b
·
1 Parent(s): b8c6f19

Add requirements for OCR, barcode detection, database management, and caching

Browse files

---

For more details, open the [Copilot Workspace session](https://copilot-workspace.githubnext.com/prathameshks/FoodAnalyzer-API?shareId=XXXX-XXXX-XXXX-XXXX).

database.py ADDED
@@ -0,0 +1,19 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from sqlalchemy import create_engine
2
+ from sqlalchemy.ext.declarative import declarative_base
3
+ from sqlalchemy.orm import sessionmaker
4
+
5
+ SQLALCHEMY_DATABASE_URL = "sqlite:///./test.db"
6
+
7
+ engine = create_engine(
8
+ SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
9
+ )
10
+ SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
11
+
12
+ Base = declarative_base()
13
+
14
+ def get_db():
15
+ db = SessionLocal()
16
+ try:
17
+ yield db
18
+ finally:
19
+ db.close()
main.py CHANGED
@@ -1,10 +1,16 @@
1
  from fastapi import FastAPI
2
  from routes.extract_product_info_from_barcode import router as extract_product_info_router
3
  from routes.fetch_product_data import router as fetch_product_data_router
 
 
 
4
 
5
  app = FastAPI()
6
 
7
  app.include_router(extract_product_info_router, prefix="/api")
8
  app.include_router(fetch_product_data_router, prefix="/api")
 
 
 
9
 
10
- # To run the FastAPI app, use the command: uvicorn main:app --reload
 
1
  from fastapi import FastAPI
2
  from routes.extract_product_info_from_barcode import router as extract_product_info_router
3
  from routes.fetch_product_data import router as fetch_product_data_router
4
+ from routes.auth import router as auth_router
5
+ from routes.analysis import router as analysis_router
6
+ from routes.history import router as history_router
7
 
8
  app = FastAPI()
9
 
10
  app.include_router(extract_product_info_router, prefix="/api")
11
  app.include_router(fetch_product_data_router, prefix="/api")
12
+ app.include_router(auth_router, prefix="/api")
13
+ app.include_router(analysis_router, prefix="/api")
14
+ app.include_router(history_router, prefix="/api")
15
 
16
+ # To run the FastAPI app, use the command: uvicorn main:app --reload
models/ingredient.py ADDED
@@ -0,0 +1,9 @@
 
 
 
 
 
 
 
 
 
 
1
+ from sqlalchemy import Column, Integer, String, JSON
2
+ from database import Base
3
+
4
+ class Ingredient(Base):
5
+ __tablename__ = "ingredients"
6
+
7
+ id = Column(Integer, primary_key=True, index=True)
8
+ name = Column(String, unique=True, index=True)
9
+ nutritional_info = Column(JSON)
models/scan_history.py ADDED
@@ -0,0 +1,13 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from sqlalchemy import Column, Integer, ForeignKey, DateTime
2
+ from sqlalchemy.orm import relationship
3
+ from database import Base
4
+
5
+ class ScanHistory(Base):
6
+ __tablename__ = "scan_history"
7
+
8
+ id = Column(Integer, primary_key=True, index=True)
9
+ user_id = Column(Integer, ForeignKey("users.id"), nullable=False)
10
+ product_id = Column(Integer, nullable=False)
11
+ scan_date = Column(DateTime, nullable=False)
12
+
13
+ user = relationship("User", back_populates="scan_history")
models/user.py ADDED
@@ -0,0 +1,10 @@
 
 
 
 
 
 
 
 
 
 
 
1
+ from sqlalchemy import Column, Integer, String
2
+ from database import Base
3
+
4
+ class User(Base):
5
+ __tablename__ = "users"
6
+
7
+ id = Column(Integer, primary_key=True, index=True)
8
+ username = Column(String, unique=True, index=True)
9
+ email = Column(String, unique=True, index=True)
10
+ hashed_password = Column(String)
models/user_preferences.py ADDED
@@ -0,0 +1,15 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from sqlalchemy import Column, Integer, String, ForeignKey
2
+ from sqlalchemy.orm import relationship
3
+ from database import Base
4
+
5
+ class UserPreferences(Base):
6
+ __tablename__ = "user_preferences"
7
+
8
+ id = Column(Integer, primary_key=True, index=True)
9
+ user_id = Column(Integer, ForeignKey("users.id"), nullable=False)
10
+ dietary_restrictions = Column(String, nullable=True)
11
+ allergens = Column(String, nullable=True)
12
+ preferred_ingredients = Column(String, nullable=True)
13
+ disliked_ingredients = Column(String, nullable=True)
14
+
15
+ user = relationship("User", back_populates="preferences")
requirements.txt CHANGED
@@ -1,4 +1,9 @@
1
  fastapi
2
  uvicorn
3
  pydantic
4
- requests
 
 
 
 
 
 
1
  fastapi
2
  uvicorn
3
  pydantic
4
+ requests
5
+ pytesseract
6
+ opencv-python
7
+ sqlalchemy
8
+ alembic
9
+ redis
routers/analysis.py ADDED
@@ -0,0 +1,26 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from fastapi import APIRouter, Depends, HTTPException
2
+ from sqlalchemy.orm import Session
3
+ from typing import List, Dict, Any
4
+ from database import get_db
5
+ from models.user import User
6
+ from models.ingredient import Ingredient
7
+ from services.analysis_agent import analyze_ingredients, provide_personalized_recommendations
8
+ from services.auth_service import get_current_user
9
+
10
+ router = APIRouter()
11
+
12
+ @router.post("/analyze_ingredients")
13
+ def analyze_ingredients_endpoint(ingredients: List[Dict[str, Any]], db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
14
+ try:
15
+ analysis_results = analyze_ingredients(db, ingredients, current_user.id)
16
+ return analysis_results
17
+ except Exception as e:
18
+ raise HTTPException(status_code=500, detail=str(e))
19
+
20
+ @router.get("/personalized_recommendations")
21
+ def personalized_recommendations_endpoint(db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
22
+ try:
23
+ recommendations = provide_personalized_recommendations(db, current_user.id)
24
+ return recommendations
25
+ except Exception as e:
26
+ raise HTTPException(status_code=500, detail=str(e))
routers/auth.py ADDED
@@ -0,0 +1,46 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from fastapi import APIRouter, Depends, HTTPException, status
2
+ from sqlalchemy.orm import Session
3
+ from fastapi.security import OAuth2PasswordRequestForm
4
+ from pydantic import BaseModel
5
+ from database import get_db
6
+ from services.auth_service import authenticate_user, create_access_token, create_user, get_current_active_user
7
+ from datetime import timedelta
8
+
9
+ router = APIRouter()
10
+
11
+ class UserCreate(BaseModel):
12
+ username: str
13
+ email: str
14
+ password: str
15
+
16
+ class Token(BaseModel):
17
+ access_token: str
18
+ token_type: str
19
+
20
+ @router.post("/register", response_model=Token)
21
+ def register(user: UserCreate, db: Session = Depends(get_db)):
22
+ db_user = create_user(db, user.username, user.email, user.password)
23
+ access_token_expires = timedelta(minutes=30)
24
+ access_token = create_access_token(
25
+ data={"sub": db_user.username}, expires_delta=access_token_expires
26
+ )
27
+ return {"access_token": access_token, "token_type": "bearer"}
28
+
29
+ @router.post("/login", response_model=Token)
30
+ def login(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
31
+ user = authenticate_user(db, form_data.username, form_data.password)
32
+ if not user:
33
+ raise HTTPException(
34
+ status_code=status.HTTP_401_UNAUTHORIZED,
35
+ detail="Incorrect username or password",
36
+ headers={"WWW-Authenticate": "Bearer"},
37
+ )
38
+ access_token_expires = timedelta(minutes=30)
39
+ access_token = create_access_token(
40
+ data={"sub": user.username}, expires_delta=access_token_expires
41
+ )
42
+ return {"access_token": access_token, "token_type": "bearer"}
43
+
44
+ @router.get("/users/me", response_model=UserCreate)
45
+ def read_users_me(current_user: UserCreate = Depends(get_current_active_user)):
46
+ return current_user
routers/history.py ADDED
@@ -0,0 +1,30 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from fastapi import APIRouter, Depends, HTTPException, status
2
+ from sqlalchemy.orm import Session
3
+ from pydantic import BaseModel
4
+ from database import get_db
5
+ from services.scan_history import record_scan, get_scan_history
6
+ from models.scan_history import ScanHistory
7
+
8
+ router = APIRouter()
9
+
10
+ class ScanHistoryCreate(BaseModel):
11
+ user_id: int
12
+ product_id: int
13
+
14
+ class ScanHistoryResponse(BaseModel):
15
+ id: int
16
+ user_id: int
17
+ product_id: int
18
+ scan_date: str
19
+
20
+ @router.post("/scan", response_model=ScanHistoryResponse)
21
+ def create_scan(scan: ScanHistoryCreate, db: Session = Depends(get_db)):
22
+ scan_entry = record_scan(db, scan.user_id, scan.product_id)
23
+ return scan_entry
24
+
25
+ @router.get("/history/{user_id}", response_model=list[ScanHistoryResponse])
26
+ def read_scan_history(user_id: int, db: Session = Depends(get_db)):
27
+ scan_history = get_scan_history(db, user_id)
28
+ if not scan_history:
29
+ raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Scan history not found")
30
+ return scan_history
services/ai_agent.py ADDED
@@ -0,0 +1,88 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from sqlalchemy.orm import Session
2
+ from fastapi import HTTPException
3
+ from utils import fetch_product_data_from_api, save_json_file
4
+ from models.ingredient import Ingredient
5
+ from services.ingredients import get_ingredient_by_name, save_ingredient_data
6
+ from typing import Dict, Any
7
+ import json
8
+
9
+ def preprocess_data(barcode: str) -> Dict[str, Any]:
10
+ data = fetch_product_data_from_api(barcode)
11
+ product = data.get('product', {})
12
+
13
+ product_info = {
14
+ "product_name": product.get('product_name_en', product.get('product_name', 'N/A')),
15
+ "generic_name": product.get('generic_name_en', product.get('generic_name', 'N/A')),
16
+ "brands": product.get('brands', 'N/A'),
17
+ "ingredients": [],
18
+ "ingredients_text": product.get('ingredients_text_en', product.get('ingredients_text', 'N/A')),
19
+ "ingredients_analysis": product.get('ingredients_analysis', {}),
20
+ "nutriscore": product.get('nutriscore', {}),
21
+ "nutrient_levels": product.get('nutrient_levels', {}),
22
+ "nutriments": product.get('nutriments', {}),
23
+ "data_quality_warnings": product.get('data_quality_warnings_tags', [])
24
+ }
25
+
26
+ ingredients_list = product.get('ingredients', [])
27
+ for ingredient in ingredients_list:
28
+ ingredient_info = {
29
+ "text": ingredient.get('text', 'N/A'),
30
+ "percent": ingredient.get('percent', ingredient.get('percent_estimate', 'N/A')),
31
+ "vegan": ingredient.get('vegan', 'N/A'),
32
+ "vegetarian": ingredient.get('vegetarian', 'N/A'),
33
+ "sub_ingredients": []
34
+ }
35
+ sub_ingredients = ingredient.get('ingredients', [])
36
+ for sub_ingredient in sub_ingredients:
37
+ sub_ingredient_info = {
38
+ "text": sub_ingredient.get('text', 'N/A'),
39
+ "percent": sub_ingredient.get('percent', sub_ingredient.get('percent_estimate', 'N/A')),
40
+ "vegan": sub_ingredient.get('vegan', 'N/A'),
41
+ "vegetarian": sub_ingredient.get('vegetarian', 'N/A')
42
+ }
43
+ ingredient_info["sub_ingredients"].append(sub_ingredient_info)
44
+ product_info["ingredients"].append(ingredient_info)
45
+
46
+ return product_info
47
+
48
+ def validate_data(data: Dict[str, Any]) -> bool:
49
+ required_fields = ["product_name", "generic_name", "brands", "ingredients", "nutriscore", "nutrient_levels", "nutriments"]
50
+ for field in required_fields:
51
+ if field not in data or not data[field]:
52
+ return False
53
+ return True
54
+
55
+ def clean_data(data: Dict[str, Any]) -> Dict[str, Any]:
56
+ for ingredient in data["ingredients"]:
57
+ if "percent" in ingredient and ingredient["percent"] == "N/A":
58
+ ingredient["percent"] = 0
59
+ for sub_ingredient in ingredient["sub_ingredients"]:
60
+ if "percent" in sub_ingredient and sub_ingredient["percent"] == "N/A":
61
+ sub_ingredient["percent"] = 0
62
+ return data
63
+
64
+ def standardize_data(data: Dict[str, Any]) -> Dict[str, Any]:
65
+ for ingredient in data["ingredients"]:
66
+ ingredient["text"] = ingredient["text"].lower()
67
+ for sub_ingredient in ingredient["sub_ingredients"]:
68
+ sub_ingredient["text"] = sub_ingredient["text"].lower()
69
+ return data
70
+
71
+ def enrich_data(db: Session, data: Dict[str, Any]) -> Dict[str, Any]:
72
+ for ingredient in data["ingredients"]:
73
+ ingredient_data = get_ingredient_by_name(db, ingredient["text"])
74
+ if not ingredient_data:
75
+ ingredient_data = fetch_product_data_from_api(ingredient["text"])
76
+ save_ingredient_data(db, ingredient["text"], ingredient_data)
77
+ ingredient["nutritional_info"] = ingredient_data
78
+ return data
79
+
80
+ def process_data(db: Session, barcode: str) -> Dict[str, Any]:
81
+ data = preprocess_data(barcode)
82
+ if not validate_data(data):
83
+ raise HTTPException(status_code=400, detail="Invalid data")
84
+ data = clean_data(data)
85
+ data = standardize_data(data)
86
+ data = enrich_data(db, data)
87
+ save_json_file(barcode, data)
88
+ return data
services/analysis_agent.py ADDED
@@ -0,0 +1,48 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from sqlalchemy.orm import Session
2
+ from fastapi import HTTPException
3
+ from typing import Dict, Any, List
4
+ from models.user_preferences import UserPreferences
5
+ from services.ingredients import get_ingredient_data, filter_ingredients_by_preferences
6
+
7
+ def analyze_ingredients(db: Session, ingredients: List[Dict[str, Any]], user_id: int) -> Dict[str, Any]:
8
+ preferences = db.query(UserPreferences).filter(UserPreferences.user_id == user_id).first()
9
+ if not preferences:
10
+ raise HTTPException(status_code=404, detail="User preferences not found")
11
+
12
+ filtered_ingredients = filter_ingredients_by_preferences(ingredients, preferences)
13
+ analysis_results = {
14
+ "safe_ingredients": [],
15
+ "unsafe_ingredients": [],
16
+ "additional_facts": []
17
+ }
18
+
19
+ for ingredient in filtered_ingredients:
20
+ ingredient_data = get_ingredient_data(db, ingredient["text"])
21
+ if ingredient_data:
22
+ analysis_results["safe_ingredients"].append({
23
+ "name": ingredient["text"],
24
+ "nutritional_info": ingredient_data
25
+ })
26
+ else:
27
+ analysis_results["unsafe_ingredients"].append({
28
+ "name": ingredient["text"],
29
+ "reason": "Information not found"
30
+ })
31
+
32
+ return analysis_results
33
+
34
+ def provide_personalized_recommendations(db: Session, user_id: int) -> Dict[str, Any]:
35
+ preferences = db.query(UserPreferences).filter(UserPreferences.user_id == user_id).first()
36
+ if not preferences:
37
+ raise HTTPException(status_code=404, detail="User preferences not found")
38
+
39
+ recommended_ingredients = []
40
+ all_ingredients = db.query(Ingredient).all()
41
+ for ingredient in all_ingredients:
42
+ if ingredient.name not in preferences.disliked_ingredients:
43
+ recommended_ingredients.append({
44
+ "name": ingredient.name,
45
+ "nutritional_info": ingredient.nutritional_info
46
+ })
47
+
48
+ return {"recommended_ingredients": recommended_ingredients}
services/auth_service.py ADDED
@@ -0,0 +1,87 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from passlib.context import CryptContext
2
+ from jose import JWTError, jwt
3
+ from datetime import datetime, timedelta
4
+ from fastapi import Depends, HTTPException, status
5
+ from fastapi.security import OAuth2PasswordBearer
6
+ from sqlalchemy.orm import Session
7
+ from database import get_db
8
+ from models.user import User
9
+ from pydantic import BaseModel
10
+
11
+ # to get a string like this run:
12
+ # openssl rand -hex 32
13
+ SECRET_KEY = "09d8f7a6b5c4e3d2f1a0b9c8d7e6f5a4"
14
+ ALGORITHM = "HS256"
15
+ ACCESS_TOKEN_EXPIRE_MINUTES = 30
16
+
17
+ pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
18
+ oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
19
+
20
+ class Token(BaseModel):
21
+ access_token: str
22
+ token_type: str
23
+
24
+ class TokenData(BaseModel):
25
+ username: str | None = None
26
+
27
+ class UserInDB(User):
28
+ hashed_password: str
29
+
30
+ def verify_password(plain_password, hashed_password):
31
+ return pwd_context.verify(plain_password, hashed_password)
32
+
33
+ def get_password_hash(password):
34
+ return pwd_context.hash(password)
35
+
36
+ def get_user(db, username: str):
37
+ return db.query(User).filter(User.username == username).first()
38
+
39
+ def authenticate_user(db, username: str, password: str):
40
+ user = get_user(db, username)
41
+ if not user:
42
+ return False
43
+ if not verify_password(password, user.hashed_password):
44
+ return False
45
+ return user
46
+
47
+ def create_access_token(data: dict, expires_delta: timedelta | None = None):
48
+ to_encode = data.copy()
49
+ if expires_delta:
50
+ expire = datetime.utcnow() + expires_delta
51
+ else:
52
+ expire = datetime.utcnow() + timedelta(minutes=15)
53
+ to_encode.update({"exp": expire})
54
+ encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
55
+ return encoded_jwt
56
+
57
+ async def get_current_user(db: Session = Depends(get_db), token: str = Depends(oauth2_scheme)):
58
+ credentials_exception = HTTPException(
59
+ status_code=status.HTTP_401_UNAUTHORIZED,
60
+ detail="Could not validate credentials",
61
+ headers={"WWW-Authenticate": "Bearer"},
62
+ )
63
+ try:
64
+ payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
65
+ username: str = payload.get("sub")
66
+ if username is None:
67
+ raise credentials_exception
68
+ token_data = TokenData(username=username)
69
+ except JWTError:
70
+ raise credentials_exception
71
+ user = get_user(db, username=token_data.username)
72
+ if user is None:
73
+ raise credentials_exception
74
+ return user
75
+
76
+ async def get_current_active_user(current_user: User = Depends(get_current_user)):
77
+ if not current_user.is_active:
78
+ raise HTTPException(status_code=400, detail="Inactive user")
79
+ return current_user
80
+
81
+ def create_user(db: Session, username: str, email: str, password: str):
82
+ hashed_password = get_password_hash(password)
83
+ db_user = User(username=username, email=email, hashed_password=hashed_password)
84
+ db.add(db_user)
85
+ db.commit()
86
+ db.refresh(db_user)
87
+ return db_user
services/ingredients.py ADDED
@@ -0,0 +1,45 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from sqlalchemy.orm import Session
2
+ from models.ingredient import Ingredient
3
+ from fastapi import HTTPException
4
+ from cachetools import cached, TTLCache
5
+ from typing import List, Dict, Any
6
+ import requests
7
+
8
+ cache = TTLCache(maxsize=100, ttl=300)
9
+
10
+ def get_ingredient_by_name(db: Session, name: str) -> Ingredient:
11
+ return db.query(Ingredient).filter(Ingredient.name == name).first()
12
+
13
+ @cached(cache)
14
+ def fetch_ingredient_data_from_api(name: str) -> Dict[str, Any]:
15
+ url = f"https://api.example.com/ingredients/{name}"
16
+ response = requests.get(url)
17
+ if response.status_code != 200:
18
+ raise HTTPException(status_code=response.status_code, detail=f"Failed to fetch data for ingredient {name}")
19
+ return response.json()
20
+
21
+ def get_ingredient_data(db: Session, name: str) -> Dict[str, Any]:
22
+ ingredient = get_ingredient_by_name(db, name)
23
+ if ingredient:
24
+ return ingredient.nutritional_info
25
+ data = fetch_ingredient_data_from_api(name)
26
+ save_ingredient_data(db, name, data)
27
+ return data
28
+
29
+ def save_ingredient_data(db: Session, name: str, data: Dict[str, Any]):
30
+ ingredient = Ingredient(name=name, nutritional_info=data)
31
+ db.add(ingredient)
32
+ db.commit()
33
+ db.refresh(ingredient)
34
+
35
+ def filter_ingredients_by_preferences(ingredients: List[Dict[str, Any]], preferences: Dict[str, Any]) -> List[Dict[str, Any]]:
36
+ filtered_ingredients = []
37
+ for ingredient in ingredients:
38
+ if preferences.get("low_sugar") and ingredient.get("sugar", 0) > 5:
39
+ continue
40
+ if preferences.get("low_fat") and ingredient.get("fat", 0) > 5:
41
+ continue
42
+ if preferences.get("allergens") and any(allergen in ingredient.get("allergens", []) for allergen in preferences["allergens"]):
43
+ continue
44
+ filtered_ingredients.append(ingredient)
45
+ return filtered_ingredients
services/logging_service.py ADDED
@@ -0,0 +1,38 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import logging
2
+ from logging.handlers import RotatingFileHandler
3
+
4
+ # Configure logging
5
+ logger = logging.getLogger("food_analyzer")
6
+ logger.setLevel(logging.DEBUG)
7
+
8
+ # Create a file handler that logs debug and higher level messages
9
+ file_handler = RotatingFileHandler("food_analyzer.log", maxBytes=5*1024*1024, backupCount=3)
10
+ file_handler.setLevel(logging.DEBUG)
11
+
12
+ # Create a console handler that logs error and higher level messages
13
+ console_handler = logging.StreamHandler()
14
+ console_handler.setLevel(logging.ERROR)
15
+
16
+ # Create a formatter and set it for both handlers
17
+ formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
18
+ file_handler.setFormatter(formatter)
19
+ console_handler.setFormatter(formatter)
20
+
21
+ # Add the handlers to the logger
22
+ logger.addHandler(file_handler)
23
+ logger.addHandler(console_handler)
24
+
25
+ def log_debug(message: str):
26
+ logger.debug(message)
27
+
28
+ def log_info(message: str):
29
+ logger.info(message)
30
+
31
+ def log_warning(message: str):
32
+ logger.warning(message)
33
+
34
+ def log_error(message: str):
35
+ logger.error(message)
36
+
37
+ def log_critical(message: str):
38
+ logger.critical(message)
services/scan_history.py ADDED
@@ -0,0 +1,13 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from sqlalchemy.orm import Session
2
+ from models.scan_history import ScanHistory
3
+ from datetime import datetime
4
+
5
+ def record_scan(db: Session, user_id: int, product_id: int):
6
+ scan_entry = ScanHistory(user_id=user_id, product_id=product_id, scan_date=datetime.utcnow())
7
+ db.add(scan_entry)
8
+ db.commit()
9
+ db.refresh(scan_entry)
10
+ return scan_entry
11
+
12
+ def get_scan_history(db: Session, user_id: int):
13
+ return db.query(ScanHistory).filter(ScanHistory.user_id == user_id).all()
tests/test_ai_agent_service.py ADDED
@@ -0,0 +1,141 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import unittest
2
+ from unittest.mock import patch, MagicMock
3
+ from sqlalchemy.orm import Session
4
+ from services.ai_agent import preprocess_data, validate_data, clean_data, standardize_data, enrich_data, process_data
5
+
6
+ class TestAIAgentService(unittest.TestCase):
7
+
8
+ @patch('services.ai_agent.fetch_product_data_from_api')
9
+ def test_preprocess_data(self, mock_fetch_product_data_from_api):
10
+ mock_fetch_product_data_from_api.return_value = {
11
+ 'product': {
12
+ 'product_name_en': 'Test Product',
13
+ 'generic_name_en': 'Test Generic',
14
+ 'brands': 'Test Brand',
15
+ 'ingredients': [
16
+ {
17
+ 'text': 'Test Ingredient',
18
+ 'percent': 50,
19
+ 'vegan': 'yes',
20
+ 'vegetarian': 'yes',
21
+ 'ingredients': [
22
+ {
23
+ 'text': 'Sub Ingredient',
24
+ 'percent': 25,
25
+ 'vegan': 'yes',
26
+ 'vegetarian': 'yes'
27
+ }
28
+ ]
29
+ }
30
+ ],
31
+ 'ingredients_text_en': 'Test Ingredients Text',
32
+ 'ingredients_analysis': {},
33
+ 'nutriscore': {},
34
+ 'nutrient_levels': {},
35
+ 'nutriments': {},
36
+ 'data_quality_warnings_tags': []
37
+ }
38
+ }
39
+ result = preprocess_data('test_barcode')
40
+ self.assertEqual(result['product_name'], 'Test Product')
41
+ self.assertEqual(result['generic_name'], 'Test Generic')
42
+ self.assertEqual(result['brands'], 'Test Brand')
43
+ self.assertEqual(result['ingredients'][0]['text'], 'Test Ingredient')
44
+ self.assertEqual(result['ingredients'][0]['sub_ingredients'][0]['text'], 'Sub Ingredient')
45
+
46
+ def test_validate_data(self):
47
+ valid_data = {
48
+ 'product_name': 'Test Product',
49
+ 'generic_name': 'Test Generic',
50
+ 'brands': 'Test Brand',
51
+ 'ingredients': [],
52
+ 'nutriscore': {},
53
+ 'nutrient_levels': {},
54
+ 'nutriments': {}
55
+ }
56
+ invalid_data = {
57
+ 'product_name': '',
58
+ 'generic_name': '',
59
+ 'brands': '',
60
+ 'ingredients': [],
61
+ 'nutriscore': {},
62
+ 'nutrient_levels': {},
63
+ 'nutriments': {}
64
+ }
65
+ self.assertTrue(validate_data(valid_data))
66
+ self.assertFalse(validate_data(invalid_data))
67
+
68
+ def test_clean_data(self):
69
+ data = {
70
+ 'ingredients': [
71
+ {
72
+ 'text': 'Test Ingredient',
73
+ 'percent': 'N/A',
74
+ 'sub_ingredients': [
75
+ {
76
+ 'text': 'Sub Ingredient',
77
+ 'percent': 'N/A'
78
+ }
79
+ ]
80
+ }
81
+ ]
82
+ }
83
+ cleaned_data = clean_data(data)
84
+ self.assertEqual(cleaned_data['ingredients'][0]['percent'], 0)
85
+ self.assertEqual(cleaned_data['ingredients'][0]['sub_ingredients'][0]['percent'], 0)
86
+
87
+ def test_standardize_data(self):
88
+ data = {
89
+ 'ingredients': [
90
+ {
91
+ 'text': 'Test Ingredient',
92
+ 'sub_ingredients': [
93
+ {
94
+ 'text': 'Sub Ingredient'
95
+ }
96
+ ]
97
+ }
98
+ ]
99
+ }
100
+ standardized_data = standardize_data(data)
101
+ self.assertEqual(standardized_data['ingredients'][0]['text'], 'test ingredient')
102
+ self.assertEqual(standardized_data['ingredients'][0]['sub_ingredients'][0]['text'], 'sub ingredient')
103
+
104
+ @patch('services.ai_agent.get_ingredient_by_name')
105
+ @patch('services.ai_agent.fetch_product_data_from_api')
106
+ @patch('services.ai_agent.save_ingredient_data')
107
+ def test_enrich_data(self, mock_save_ingredient_data, mock_fetch_product_data_from_api, mock_get_ingredient_by_name):
108
+ db = MagicMock(spec=Session)
109
+ data = {
110
+ 'ingredients': [
111
+ {
112
+ 'text': 'Test Ingredient',
113
+ 'sub_ingredients': []
114
+ }
115
+ ]
116
+ }
117
+ mock_get_ingredient_by_name.return_value = None
118
+ mock_fetch_product_data_from_api.return_value = {'nutritional_info': 'Test Info'}
119
+ enriched_data = enrich_data(db, data)
120
+ self.assertEqual(enriched_data['ingredients'][0]['nutritional_info'], {'nutritional_info': 'Test Info'})
121
+ mock_save_ingredient_data.assert_called_once_with(db, 'test ingredient', {'nutritional_info': 'Test Info'})
122
+
123
+ @patch('services.ai_agent.preprocess_data')
124
+ @patch('services.ai_agent.validate_data')
125
+ @patch('services.ai_agent.clean_data')
126
+ @patch('services.ai_agent.standardize_data')
127
+ @patch('services.ai_agent.enrich_data')
128
+ @patch('services.ai_agent.save_json_file')
129
+ def test_process_data(self, mock_save_json_file, mock_enrich_data, mock_standardize_data, mock_clean_data, mock_validate_data, mock_preprocess_data):
130
+ db = MagicMock(spec=Session)
131
+ mock_preprocess_data.return_value = {'product_name': 'Test Product'}
132
+ mock_validate_data.return_value = True
133
+ mock_clean_data.return_value = {'product_name': 'Test Product'}
134
+ mock_standardize_data.return_value = {'product_name': 'Test Product'}
135
+ mock_enrich_data.return_value = {'product_name': 'Test Product'}
136
+ result = process_data(db, 'test_barcode')
137
+ self.assertEqual(result['product_name'], 'Test Product')
138
+ mock_save_json_file.assert_called_once_with('test_barcode', {'product_name': 'Test Product'})
139
+
140
+ if __name__ == '__main__':
141
+ unittest.main()
tests/test_analysis_agent_service.py ADDED
@@ -0,0 +1,56 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import unittest
2
+ from unittest.mock import MagicMock
3
+ from sqlalchemy.orm import Session
4
+ from services.analysis_agent import analyze_ingredients, provide_personalized_recommendations
5
+ from models.user_preferences import UserPreferences
6
+ from models.ingredient import Ingredient
7
+
8
+ class TestAnalysisAgentService(unittest.TestCase):
9
+
10
+ def setUp(self):
11
+ self.db = MagicMock(spec=Session)
12
+ self.user_id = 1
13
+ self.ingredients = [
14
+ {"text": "sugar"},
15
+ {"text": "salt"},
16
+ {"text": "flour"}
17
+ ]
18
+ self.preferences = UserPreferences(
19
+ user_id=self.user_id,
20
+ dietary_restrictions="low sugar",
21
+ allergens="",
22
+ preferred_ingredients="",
23
+ disliked_ingredients=""
24
+ )
25
+ self.db.query.return_value.filter.return_value.first.return_value = self.preferences
26
+
27
+ def test_analyze_ingredients(self):
28
+ self.db.query.return_value.filter.return_value.first.return_value = self.preferences
29
+ self.db.query.return_value.all.return_value = [
30
+ Ingredient(name="sugar", nutritional_info={"calories": 100}),
31
+ Ingredient(name="salt", nutritional_info={"sodium": 200}),
32
+ Ingredient(name="flour", nutritional_info={"carbs": 300})
33
+ ]
34
+
35
+ result = analyze_ingredients(self.db, self.ingredients, self.user_id)
36
+
37
+ self.assertIn("safe_ingredients", result)
38
+ self.assertIn("unsafe_ingredients", result)
39
+ self.assertIn("additional_facts", result)
40
+ self.assertEqual(len(result["safe_ingredients"]), 2)
41
+ self.assertEqual(len(result["unsafe_ingredients"]), 1)
42
+
43
+ def test_provide_personalized_recommendations(self):
44
+ self.db.query.return_value.all.return_value = [
45
+ Ingredient(name="sugar", nutritional_info={"calories": 100}),
46
+ Ingredient(name="salt", nutritional_info={"sodium": 200}),
47
+ Ingredient(name="flour", nutritional_info={"carbs": 300})
48
+ ]
49
+
50
+ result = provide_personalized_recommendations(self.db, self.user_id)
51
+
52
+ self.assertIn("recommended_ingredients", result)
53
+ self.assertEqual(len(result["recommended_ingredients"]), 3)
54
+
55
+ if __name__ == "__main__":
56
+ unittest.main()
tests/test_auth_service.py ADDED
@@ -0,0 +1,55 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import pytest
2
+ from fastapi.testclient import TestClient
3
+ from sqlalchemy import create_engine
4
+ from sqlalchemy.orm import sessionmaker
5
+ from database import Base, get_db
6
+ from main import app
7
+ from services.auth_service import create_user, get_password_hash
8
+ from models.user import User
9
+
10
+ SQLALCHEMY_DATABASE_URL = "sqlite:///./test.db"
11
+ engine = create_engine(SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False})
12
+ TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
13
+
14
+ Base.metadata.create_all(bind=engine)
15
+
16
+ def override_get_db():
17
+ try:
18
+ db = TestingSessionLocal()
19
+ yield db
20
+ finally:
21
+ db.close()
22
+
23
+ app.dependency_overrides[get_db] = override_get_db
24
+
25
+ client = TestClient(app)
26
+
27
+ @pytest.fixture(scope="module")
28
+ def test_db():
29
+ Base.metadata.create_all(bind=engine)
30
+ db = TestingSessionLocal()
31
+ yield db
32
+ db.close()
33
+ Base.metadata.drop_all(bind=engine)
34
+
35
+ def test_register_user(test_db):
36
+ response = client.post("/auth/register", json={"username": "testuser", "email": "testuser@example.com", "password": "testpassword"})
37
+ assert response.status_code == 200
38
+ assert "access_token" in response.json()
39
+ assert response.json()["token_type"] == "bearer"
40
+
41
+ def test_login_user(test_db):
42
+ create_user(test_db, "testuser", "testuser@example.com", get_password_hash("testpassword"))
43
+ response = client.post("/auth/login", data={"username": "testuser", "password": "testpassword"})
44
+ assert response.status_code == 200
45
+ assert "access_token" in response.json()
46
+ assert response.json()["token_type"] == "bearer"
47
+
48
+ def test_get_current_user(test_db):
49
+ create_user(test_db, "testuser", "testuser@example.com", get_password_hash("testpassword"))
50
+ response = client.post("/auth/login", data={"username": "testuser", "password": "testpassword"})
51
+ token = response.json()["access_token"]
52
+ response = client.get("/auth/users/me", headers={"Authorization": f"Bearer {token}"})
53
+ assert response.status_code == 200
54
+ assert response.json()["username"] == "testuser"
55
+ assert response.json()["email"] == "testuser@example.com"
tests/test_ingredients_service.py ADDED
@@ -0,0 +1,52 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import unittest
2
+ from unittest.mock import patch, MagicMock
3
+ from sqlalchemy.orm import Session
4
+ from services.ingredients import get_ingredient_data, save_ingredient_data, filter_ingredients_by_preferences
5
+
6
+ class TestIngredientService(unittest.TestCase):
7
+
8
+ @patch('services.ingredients.get_ingredient_by_name')
9
+ @patch('services.ingredients.fetch_ingredient_data_from_api')
10
+ @patch('services.ingredients.save_ingredient_data')
11
+ def test_get_ingredient_data(self, mock_save_ingredient_data, mock_fetch_ingredient_data_from_api, mock_get_ingredient_by_name):
12
+ db = MagicMock(spec=Session)
13
+ mock_get_ingredient_by_name.return_value = None
14
+ mock_fetch_ingredient_data_from_api.return_value = {"name": "test_ingredient", "nutritional_info": "test_info"}
15
+
16
+ result = get_ingredient_data(db, "test_ingredient")
17
+
18
+ mock_get_ingredient_by_name.assert_called_once_with(db, "test_ingredient")
19
+ mock_fetch_ingredient_data_from_api.assert_called_once_with("test_ingredient")
20
+ mock_save_ingredient_data.assert_called_once_with(db, "test_ingredient", {"name": "test_ingredient", "nutritional_info": "test_info"})
21
+ self.assertEqual(result, {"name": "test_ingredient", "nutritional_info": "test_info"})
22
+
23
+ def test_save_ingredient_data(self):
24
+ db = MagicMock(spec=Session)
25
+ name = "test_ingredient"
26
+ data = {"name": "test_ingredient", "nutritional_info": "test_info"}
27
+
28
+ save_ingredient_data(db, name, data)
29
+
30
+ db.add.assert_called_once()
31
+ db.commit.assert_called_once()
32
+ db.refresh.assert_called_once()
33
+
34
+ def test_filter_ingredients_by_preferences(self):
35
+ ingredients = [
36
+ {"text": "ingredient1", "sugar": 10, "fat": 5, "allergens": ["allergen1"]},
37
+ {"text": "ingredient2", "sugar": 3, "fat": 2, "allergens": []},
38
+ {"text": "ingredient3", "sugar": 6, "fat": 1, "allergens": ["allergen2"]}
39
+ ]
40
+ preferences = {
41
+ "low_sugar": True,
42
+ "low_fat": True,
43
+ "allergens": ["allergen1"]
44
+ }
45
+
46
+ result = filter_ingredients_by_preferences(ingredients, preferences)
47
+
48
+ self.assertEqual(len(result), 1)
49
+ self.assertEqual(result[0]["text"], "ingredient2")
50
+
51
+ if __name__ == '__main__':
52
+ unittest.main()
tests/test_scan_history_service.py ADDED
@@ -0,0 +1,43 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import unittest
2
+ from unittest.mock import MagicMock
3
+ from datetime import datetime
4
+ from sqlalchemy.orm import Session
5
+ from models.scan_history import ScanHistory
6
+ from services.scan_history import record_scan, get_scan_history
7
+
8
+ class TestScanHistoryService(unittest.TestCase):
9
+
10
+ def setUp(self):
11
+ self.db = MagicMock(spec=Session)
12
+ self.user_id = 1
13
+ self.product_id = 123
14
+ self.scan_date = datetime.utcnow()
15
+
16
+ def test_record_scan(self):
17
+ scan_entry = ScanHistory(user_id=self.user_id, product_id=self.product_id, scan_date=self.scan_date)
18
+ self.db.add.return_value = None
19
+ self.db.commit.return_value = None
20
+ self.db.refresh.return_value = None
21
+
22
+ result = record_scan(self.db, self.user_id, self.product_id)
23
+
24
+ self.db.add.assert_called_once_with(scan_entry)
25
+ self.db.commit.assert_called_once()
26
+ self.db.refresh.assert_called_once_with(scan_entry)
27
+ self.assertEqual(result.user_id, self.user_id)
28
+ self.assertEqual(result.product_id, self.product_id)
29
+ self.assertEqual(result.scan_date, self.scan_date)
30
+
31
+ def test_get_scan_history(self):
32
+ scan_entry = ScanHistory(user_id=self.user_id, product_id=self.product_id, scan_date=self.scan_date)
33
+ self.db.query.return_value.filter.return_value.all.return_value = [scan_entry]
34
+
35
+ result = get_scan_history(self.db, self.user_id)
36
+
37
+ self.db.query.assert_called_once_with(ScanHistory)
38
+ self.db.query.return_value.filter.assert_called_once_with(ScanHistory.user_id == self.user_id)
39
+ self.db.query.return_value.filter.return_value.all.assert_called_once()
40
+ self.assertEqual(result, [scan_entry])
41
+
42
+ if __name__ == '__main__':
43
+ unittest.main()
utils.py CHANGED
@@ -2,6 +2,8 @@ import json
2
  import os
3
  import requests
4
  from fastapi import HTTPException
 
 
5
 
6
  def fetch_product_data_from_api(barcode):
7
  url = f"https://india.openfoodfacts.org/api/v2/product/{barcode}.json"
@@ -14,4 +16,21 @@ def save_json_file(item, data):
14
  os.makedirs("v2", exist_ok=True)
15
  with open(f"v2/{item}.json", "w") as file:
16
  json.dump(data, file)
17
- print(f"Saved {item}.json")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2
  import os
3
  import requests
4
  from fastapi import HTTPException
5
+ import cv2
6
+ import pytesseract
7
 
8
  def fetch_product_data_from_api(barcode):
9
  url = f"https://india.openfoodfacts.org/api/v2/product/{barcode}.json"
 
16
  os.makedirs("v2", exist_ok=True)
17
  with open(f"v2/{item}.json", "w") as file:
18
  json.dump(data, file)
19
+ print(f"Saved {item}.json")
20
+
21
+ def extract_text_from_image(image_path):
22
+ image = cv2.imread(image_path)
23
+ gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
24
+ gray = cv2.medianBlur(gray, 3)
25
+ text = pytesseract.image_to_string(gray)
26
+ return text
27
+
28
+ def detect_barcode_from_image(image_path):
29
+ image = cv2.imread(image_path)
30
+ gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
31
+ gray = cv2.medianBlur(gray, 3)
32
+ detector = cv2.QRCodeDetector()
33
+ data, bbox, _ = detector.detectAndDecode(gray)
34
+ if bbox is not None:
35
+ return data
36
+ return None