Spaces:
Sleeping
Sleeping
File size: 3,085 Bytes
8074bec 721760c 8074bec 721760c 8074bec 721760c 8074bec 721760c 8074bec 721760c 8074bec |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from typing import List
from .models import WaterLogs
import uuid
from .schemas import WaterLogCreate, WaterLogUpdate
from datetime import date, datetime
# Create or update today's water log
async def create_water_log(
session: AsyncSession, water_log: WaterLogCreate, user_id: uuid.UUID
) -> WaterLogs:
today = date.today()
# 1️⃣ Query today's existing log
stmt = select(WaterLogs).filter(
WaterLogs.user_id == user_id,
WaterLogs.logged_at >= datetime.combine(today, datetime.min.time()),
WaterLogs.logged_at <= datetime.combine(today, datetime.max.time()),
)
result = await session.execute(stmt)
existing_log = result.scalar_one_or_none()
# 2️⃣ If today's log exists → UPDATE it
if existing_log:
existing_log.amount_ml = water_log.amount_ml
existing_log.goal_ml = water_log.goal_ml
existing_log.recommended_ml = water_log.recommended_ml
await session.commit()
await session.refresh(existing_log)
return existing_log
# 3️⃣ No log for today → CREATE a new one
new_log = WaterLogs(
id=uuid.uuid4(),
user_id=user_id,
amount_ml=water_log.amount_ml,
logged_at=datetime.utcnow(),
goal_ml=water_log.goal_ml,
recommended_ml=water_log.recommended_ml,
)
session.add(new_log)
await session.commit()
await session.refresh(new_log)
return new_log
# Get all water logs for a user
async def get_water_logs(
session: AsyncSession, user_id: uuid.UUID, skip: int = 0, limit: int = 100
) -> List[WaterLogs]:
stmt = (
select(WaterLogs).filter(WaterLogs.user_id == user_id).offset(skip).limit(limit)
)
result = await session.execute(stmt) # Execute asynchronously
return result.scalars().all() # Fetch results asynchronously
# Update a water log
async def update_water_log(
session: AsyncSession, water_log_id: uuid.UUID, water_log: WaterLogUpdate
) -> WaterLogs:
stmt = select(WaterLogs).filter(WaterLogs.id == water_log_id)
result = await session.execute(stmt) # Execute asynchronously
db_water_log = result.scalar_one_or_none()
if db_water_log:
db_water_log.amount_ml = water_log.amount_ml
db_water_log.goal_ml = water_log.goal_ml
db_water_log.recommended_ml = water_log.recommended_ml
await session.commit() # Commit asynchronously
await session.refresh(db_water_log) # Refresh asynchronously
return db_water_log
return None
# Delete a water log
async def delete_water_log(session: AsyncSession, water_log_id: uuid.UUID) -> bool:
stmt = select(WaterLogs).filter(WaterLogs.id == water_log_id)
result = await session.execute(stmt) # Execute asynchronously
db_water_log = result.scalar_one_or_none()
if db_water_log:
await session.delete(db_water_log) # Delete asynchronously
await session.commit() # Commit asynchronously
return True
return False
|