Spaces:
Runtime error
Runtime error
| import datetime | |
| from datetime import datetime | |
| from typing import Optional | |
| from sqlalchemy import select | |
| from sqlalchemy.orm import joinedload | |
| from src.models import ContentDeliveryFrequency, User | |
| from src.repositories import UserRepository | |
| from src.services._slack_bot_service import SlackBotService | |
| from src.utils import Helper, logger | |
| class UserService: | |
| """ | |
| UserService class provides various asynchronous methods to interact with user data. | |
| Methods: | |
| __aenter__() -> UserService: | |
| Asynchronous context manager entry method. | |
| __aexit__(exc_type, exc_val, exc_tb): | |
| Asynchronous context manager exit method. | |
| register_user(user: dict) -> dict: | |
| Registers a new user with the provided user data. | |
| get_user_info_by_email(email: str): | |
| Retrieves user information based on the provided email. | |
| update_user_info(user_id, info): | |
| Updates user information with the provided data. | |
| get_users_list(): | |
| Retrieves a list of all users. | |
| get_user_last_message_time(user_id): | |
| Retrieves the last message time for the specified user. | |
| update_user_last_message_time(user_id): | |
| Updates the last message time for the specified user. | |
| get_user_frequency(user_id): | |
| Retrieves the content delivery frequency for the specified user. | |
| get_user_by_slack_id(slack_id: str): | |
| Retrieves user information based on the provided Slack ID. | |
| update_user_frequency(user_id: int, frequency: ContentDeliveryFrequency) -> Optional[User]: | |
| Updates the user's content delivery frequency and calculates the next delivery date. | |
| send_message(user_id: int, message: str): | |
| Sends a message to the specified user via Slack. | |
| """ | |
| def __init__(self): | |
| self.user_repository = UserRepository() | |
| async def __aenter__(self): | |
| return self | |
| async def __aexit__(self, exc_type, exc_val, exc_tb): | |
| pass | |
| async def register_user(self, user: dict) -> dict: | |
| return await self.user_repository.create(user) | |
| async def get_user_info_by_email(self, email: str): | |
| return await self.user_repository.get_user_by_email(email=email) | |
| async def update_user_info(self, user_id, info): | |
| info["next_content_delivery_date"] = datetime.now() | |
| return await self.user_repository.patch(user_id, info) | |
| async def get_users_list(self): | |
| return await self.user_repository.get_all() | |
| async def get_user_last_message_time(self, user_id): | |
| return await self.user_repository.get( | |
| user_id, filter_by={"last_message_time": True} | |
| ) | |
| async def update_user_last_message_time(self, user_id): | |
| return await self.user_repository.patch( | |
| id=user_id, | |
| last_message_time=datetime.now(), | |
| ) | |
| async def get_user_frequency(self, user_id): | |
| return await self.user_repository.get( | |
| user_id, filter_by={"content_delivery_frequency": True} | |
| ) | |
| async def get_user_by_slack_id(self, slack_id: str): | |
| """Get user by Slack ID""" | |
| return await self.user_repository.get_user_by_slack_id(slack_id) | |
| async def update_user_frequency( | |
| self, user_id: int, frequency: ContentDeliveryFrequency | |
| ) -> Optional[User]: | |
| """Update user's content delivery frequency and next delivery date""" | |
| # First get the user to access last_message_sent_at | |
| async with self.user_repository.get_session() as session: | |
| user = await session.get(User, user_id) | |
| if not user: | |
| return None | |
| async with Helper() as helper: | |
| next_delivery_date = helper.calculate_next_delivery_date( | |
| user.last_message_sent_at, frequency | |
| ) | |
| update_data = { | |
| "content_delivery_frequency": frequency, | |
| "next_content_delivery_date": next_delivery_date, | |
| } | |
| return await self.user_repository.patch(user_id, update_data) | |
| async def send_message(self, user_id: int, message: str): | |
| """Send message to user""" | |
| query = ( | |
| select(User).options(joinedload(User.slack_team)).filter(User.id == user_id) | |
| ) | |
| user = await self.user_repository.execute(query=query) | |
| user = user.scalar() | |
| logger.info(f"User: {user}") | |
| if user and hasattr(user, "slack_id"): | |
| async with SlackBotService() as slack_bot_service: | |
| await slack_bot_service.send_im_message( | |
| user.slack_id, message, user.slack_team.token | |
| ) | |
| else: | |
| logger.error(f"User not found for user_id: {user_id}") | |
| return {"message": "User not found"} | |
| return {"message": "Message sent successfully"} | |