Spaces:
Runtime error
Runtime error
Refactor Dockerfile and clean up unused schemas; update Redis client initialization and environment configuration
a3aa6c1
| from datetime import datetime | |
| from src.models import Session | |
| from src.schemas import SessionCreateSchema | |
| from src.repositories import SessionRepository | |
| class SessionService: | |
| def __init__(self): | |
| self.session_repository = SessionRepository() | |
| async def create_session(self, session_data: SessionCreateSchema): | |
| session = Session( | |
| user_id=session_data.user_id, | |
| token=session_data.token, | |
| device_ipv4_address=session_data.device_ipv4_address, | |
| expired_at=session_data.expired_at, | |
| ) | |
| return await self.session_repository.insert_one(session) | |
| async def update_session(self, session_id, session_data: SessionCreateSchema): | |
| return await self.session_repository.update(session_id, session_data) | |
| async def delete_session(self, session_id): | |
| return await self.session_repository.delete(session_id) | |
| async def get_session_by_token(self, token: str): | |
| session = await self.session_repository.get_all(filter_by={"token": token}) | |
| return session[0] if session else None | |
| async def get_session(self, session_id): | |
| return await self.session_repository.get_by_id(session_id) | |
| async def is_session_expired(self, token: str): | |
| session = await self.get_session_by_token(token) | |
| if not session: | |
| return True | |
| return session.expired_at < datetime.now() | |