| from datetime import datetime |
| from typing import Annotated |
| from fastapi import Depends, APIRouter |
| from sqlmodel import Session |
| from models import User, UserPublicMe, UserPublicMeWith, UserUpdateMe |
| from models import Site, SiteCreate, SitePublicMe, SitePublicMeWith, SiteUpdate |
| from models import Guest, GuestCreateMe, GuestPublicMe, GuestPublicWith, GuestUpdateMe, SitePublicMeWithHosts |
| from core import crud, utils |
|
|
| router = APIRouter( |
| prefix="/dashboard", |
| tags=["dashboard"], |
| ) |
|
|
|
|
| |
| |
| |
|
|
|
|
| @router.get("/", response_model=UserPublicMeWith) |
| def read_dashboard(*, |
| session: Session = Depends(utils.get_session), |
| current_user: Annotated[User, Depends(crud.get_current_active_user)] |
| ): |
| session.add(current_user) |
| return current_user |
|
|
|
|
| @router.patch("/", response_model=UserPublicMe) |
| def update_dashboard(*, |
| session: Session = Depends(utils.get_session), |
| current_user: Annotated[User, Depends(crud.get_current_active_user)], |
| user: UserUpdateMe |
| ): |
| return crud.edit_user(session, current_user, user) |
|
|
|
|
|
|
| @router.delete("/") |
| def delete_dashboard(*, |
| session: Session = Depends(utils.get_session), |
| current_user: Annotated[User, Depends(crud.get_current_active_user)], |
| ): |
| session.delete(current_user) |
| session.commit() |
| return {"ok": True} |
|
|
|
|
| |
| |
| |
|
|
| @router.post("/sites", response_model=SitePublicMe) |
| def create_site(*, |
| session: Session = Depends(utils.get_session), |
| current_user: Annotated[User, Depends(crud.get_current_active_user)], |
| site: SiteCreate |
| ): |
| crud.camera_exists(session, site) |
| extra_data = {"created_at": datetime.now(), |
| "updated_at": datetime.now()} |
| db_site = Site.model_validate(site, update=extra_data) |
| db_site = crud.push_site(session, db_site) |
| session.add(current_user) |
| current_user.sites.append(db_site) |
| session.commit() |
| return db_site |
|
|
|
|
|
|
| @router.get("/sites/{site_id}", response_model=SitePublicMeWith) |
| def read_site(*, |
| session: Session = Depends(utils.get_session), |
| current_user: Annotated[User, Depends(crud.get_current_active_user)], |
| site_id: int |
| ): |
| current_site = crud.get_current_site(session, current_user, site_id) |
| |
| |
| |
| |
| |
| |
| |
| return current_site |
|
|
|
|
|
|
| @router.patch("/sites/{site_id}", response_model=SitePublicMe) |
| def update_site(*, |
| session: Session = Depends(utils.get_session), |
| current_user: Annotated[User, Depends(crud.get_current_active_user)], |
| site_id: int, |
| site: SiteUpdate |
| ): |
| db_site = crud.get_current_site(session, current_user, site_id) |
| crud.camera_exists(session, db_site) |
| site_data = site.model_dump(exclude_unset=True) |
| extra_data = {"updated_at": datetime.now()} |
| db_site.sqlmodel_update(site_data, update=extra_data) |
| return crud.push_site(session, db_site) |
|
|
|
|
|
|
| @router.delete("/sites/{site_id}") |
| def delete_site(*, |
| session: Session = Depends(utils.get_session), |
| current_user: Annotated[User, Depends(crud.get_current_active_user)], |
| site_id: int): |
| db_site = crud.get_current_site(session, current_user, site_id) |
| session.delete(db_site) |
| session.commit() |
| return {"ok": True} |
|
|
|
|
| |
| |
| |
|
|
|
|
| @router.post("/sites/{side_id}/hosts", response_model=GuestPublicMe) |
| def create_host(*, |
| session: Session = Depends(utils.get_session), |
| current_user: Annotated[User, Depends(crud.get_current_active_user)], |
| site_id: int, |
| host: GuestCreateMe |
| ): |
| _ = crud.get_current_site(session, current_user, site_id) |
| if host.vector is not None: |
| crud.vector_exists(session, host) |
| extra_data = {"site_id": site_id} |
| db_host = Guest.model_validate(host, update=extra_data) |
| session.add(db_host) |
| session.commit() |
| session.refresh(db_host) |
| return db_host |
|
|
|
|
|
|
| @router.get("/sites/{site_id}/hosts", response_model=SitePublicMeWithHosts) |
| def read_hosts(*, |
| session: Session = Depends(utils.get_session), |
| current_user: Annotated[User, Depends(crud.get_current_active_user)], |
| site_id: int, |
| ): |
| current_site = crud.get_current_site(session, current_user, site_id) |
| return current_site |
|
|
|
|
|
|
| @router.get("/sites/{site_id}/hosts/{host_id}", response_model=GuestPublicWith) |
| def read_host(*, |
| session: Session = Depends(utils.get_session), |
| current_user: Annotated[User, Depends(crud.get_current_active_user)], |
| site_id: int, |
| host_id: int, |
| ): |
| current_site = crud.get_current_site(session, current_user, site_id) |
| return crud.get_host_of_site(session, current_site, host_id) |
|
|
|
|
|
|
| @router.patch("/sites/{site_id}/hosts/{host_id}", response_model=GuestPublicMe) |
| def update_host(*, |
| session: Session = Depends(utils.get_session), |
| current_user: Annotated[User, Depends(crud.get_current_active_user)], |
| site_id: int, |
| host_id: int, |
| host: GuestUpdateMe |
| ): |
| current_site = crud.get_current_site(session, current_user, site_id) |
| db_host = crud.get_host_of_site(session, current_site, host_id) |
| if host.vector is not None: |
| crud.vector_exists(session, host) |
| host_data = host.model_dump(exclude_unset=True) |
| extra_data = {"site_id": site_id, |
| "updated_at": datetime.now()} |
| db_host.sqlmodel_update(host_data, update=extra_data) |
| session.add(db_host) |
| session.commit() |
| session.refresh(db_host) |
| return db_host |
|
|
|
|
| @router.delete("/sites/{site_id}/hosts/{host_id}") |
| def delete_host(*, |
| session: Session = Depends(utils.get_session), |
| current_user: Annotated[User, Depends(crud.get_current_active_user)], |
| site_id: int, |
| host_id: int |
| ): |
| current_site = crud.get_current_site(session, current_user, site_id) |
| db_host = crud.get_host_of_site(session, current_site, host_id) |
| session.delete(db_host) |
| session.commit() |
| return {"ok": True} |