Spaces:
Runtime error
Runtime error
File size: 1,408 Bytes
d2b1eb6 5e2b9a0 a3aa6c1 5e2b9a0 d2b1eb6 862aa9a d2b1eb6 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
from datetime import datetime
from src.models import Session
from src.schemas import SessionCreateSchema
from src.repositories import SessionRepository
class SessionService:
def __init__(self):
self.session_repository = SessionRepository()
async def create_session(self, session_data: SessionCreateSchema):
session = Session(
user_id=session_data.user_id,
token=session_data.token,
device_ipv4_address=session_data.device_ipv4_address,
expired_at=session_data.expired_at,
)
return await self.session_repository.insert_one(session)
async def update_session(self, session_id, session_data: SessionCreateSchema):
return await self.session_repository.update(session_id, session_data)
async def delete_session(self, session_id):
return await self.session_repository.delete(session_id)
async def get_session_by_token(self, token: str):
session = await self.session_repository.get_all(filter_by={"token": token})
return session[0] if session else None
async def get_session(self, session_id):
return await self.session_repository.get_by_id(session_id)
async def is_session_expired(self, token: str):
session = await self.get_session_by_token(token)
if not session:
return True
return session.expired_at < datetime.now()
|