| from datetime import datetime | |
| from typing import List, Optional | |
| from uuid import UUID, uuid4 | |
| from fastapi import HTTPException, status | |
| from app.api.messages.parser import MessageLineParser | |
| from app.api.messages.schemas import Message, MessageCreate | |
| from app.api.participants.parser import ParticipantLineParser | |
| from app.api.participants.schemas import Participant | |
| from app.api.topics.schemas import Topic | |
| from app.core.config import ( | |
| FORBIDDEN_WORDS, | |
| FORBIDDEN_WORDS_MESSAGE, | |
| PARTICIPANT_NOT_FOUND_MESSAGE, | |
| MESSAGE_NOT_FOUND_MESSAGE, | |
| TABLE_PARTICIPANTS, | |
| TABLE_TOPICS, | |
| TOPIC_NOT_FOUND_MESSAGE, | |
| ) | |
| from app.core.file_manager import FileManager | |
| from app.core.link import Link | |
| class MessageService: | |
| def __init__( | |
| self, | |
| file_manager: FileManager, | |
| topic_file_manager: FileManager, | |
| participant_file_manager: FileManager, | |
| ) -> None: | |
| self.file_manager = file_manager | |
| self.topic_file_manager = topic_file_manager | |
| self.participant_file_manager = participant_file_manager | |
| def _check_forbidden_words(self, content: str) -> Optional[str]: | |
| content_lower = content.lower() | |
| for word in FORBIDDEN_WORDS: | |
| if word in content_lower: | |
| return word | |
| return None | |
| def _load_topics(self) -> List[Topic]: | |
| from app.api.topics.parser import TopicLineParser | |
| topics: List[Topic] = [] | |
| for line in self.topic_file_manager.read_lines(): | |
| try: | |
| topics.append(TopicLineParser.parse(line)) | |
| except ValueError: | |
| continue | |
| return topics | |
| def _load_participants(self) -> List[Participant]: | |
| participants: List[Participant] = [] | |
| for line in self.participant_file_manager.read_lines(): | |
| try: | |
| participants.append(ParticipantLineParser.parse(line)) | |
| except ValueError: | |
| continue | |
| return participants | |
| def _get_topic_by_id(self, topic_id: UUID) -> Topic: | |
| for topic in self._load_topics(): | |
| if topic.id == topic_id: | |
| return topic | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=TOPIC_NOT_FOUND_MESSAGE, | |
| ) | |
| def _get_participant_by_id(self, participant_id: UUID) -> Participant: | |
| for participant in self._load_participants(): | |
| if participant.id == participant_id: | |
| return participant | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=PARTICIPANT_NOT_FOUND_MESSAGE, | |
| ) | |
| def _resolve_id(self, link: Link, expected_table: str) -> UUID: | |
| return link.resolve({expected_table: lambda value: value}) | |
| def get_topic(self, topic_link: Link) -> Topic: | |
| topic_id = self._resolve_id(topic_link, TABLE_TOPICS) | |
| return self._get_topic_by_id(topic_id) | |
| def get_participant(self, participant_link: Link) -> Participant: | |
| participant_id = self._resolve_id(participant_link, TABLE_PARTICIPANTS) | |
| return self._get_participant_by_id(participant_id) | |
| def list_messages(self) -> List[Message]: | |
| messages: List[Message] = [] | |
| for line in self.file_manager.read_lines(): | |
| try: | |
| messages.append(MessageLineParser.parse(line)) | |
| except ValueError: | |
| continue | |
| return messages | |
| def list_messages_by_topic(self, topic_id: Link) -> List[Message]: | |
| topic_value = self._resolve_id(topic_id, TABLE_TOPICS) | |
| messages = [message for message in self.list_messages() if message.topic_id.value == topic_value] | |
| return sorted(messages, key=lambda message: message.order_in_topic) | |
| def create_message(self, payload: MessageCreate) -> Message: | |
| forbidden_word = self._check_forbidden_words(payload.content) | |
| if forbidden_word: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail=f"{FORBIDDEN_WORDS_MESSAGE}: '{forbidden_word}'", | |
| ) | |
| self.get_topic(payload.topic_id) | |
| self.get_participant(payload.participant_id) | |
| next_order = len(self.list_messages_by_topic(payload.topic_id)) + 1 | |
| message = Message( | |
| id=uuid4(), | |
| topic_id=payload.topic_id, | |
| participant_id=payload.participant_id, | |
| content=payload.content, | |
| order_in_topic=next_order, | |
| created_at=datetime.utcnow(), | |
| ) | |
| serialized = MessageLineParser.serialize(message) | |
| self.file_manager.append_line(serialized) | |
| return message | |
| def get_message(self, message_id: UUID) -> Message: | |
| for message in self.list_messages(): | |
| if message.id == message_id: | |
| return message | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=MESSAGE_NOT_FOUND_MESSAGE, | |
| ) | |
| def export_message(self, message_id: UUID) -> str: | |
| message = self.get_message(message_id) | |
| return MessageLineParser.serialize(message) | |