lab4 / app /api /messages /services.py
brestok's picture
init
a08f988
from datetime import datetime
from typing import List, Optional
from uuid import UUID, uuid4
from fastapi import HTTPException, status
from app.api.messages.parser import MessageLineParser
from app.api.messages.schemas import Message, MessageCreate
from app.api.participants.parser import ParticipantLineParser
from app.api.participants.schemas import Participant
from app.api.topics.schemas import Topic
from app.core.config import (
FORBIDDEN_WORDS,
FORBIDDEN_WORDS_MESSAGE,
PARTICIPANT_NOT_FOUND_MESSAGE,
MESSAGE_NOT_FOUND_MESSAGE,
TABLE_PARTICIPANTS,
TABLE_TOPICS,
TOPIC_NOT_FOUND_MESSAGE,
)
from app.core.file_manager import FileManager
from app.core.link import Link
class MessageService:
def __init__(
self,
file_manager: FileManager,
topic_file_manager: FileManager,
participant_file_manager: FileManager,
) -> None:
self.file_manager = file_manager
self.topic_file_manager = topic_file_manager
self.participant_file_manager = participant_file_manager
def _check_forbidden_words(self, content: str) -> Optional[str]:
content_lower = content.lower()
for word in FORBIDDEN_WORDS:
if word in content_lower:
return word
return None
def _load_topics(self) -> List[Topic]:
from app.api.topics.parser import TopicLineParser
topics: List[Topic] = []
for line in self.topic_file_manager.read_lines():
try:
topics.append(TopicLineParser.parse(line))
except ValueError:
continue
return topics
def _load_participants(self) -> List[Participant]:
participants: List[Participant] = []
for line in self.participant_file_manager.read_lines():
try:
participants.append(ParticipantLineParser.parse(line))
except ValueError:
continue
return participants
def _get_topic_by_id(self, topic_id: UUID) -> Topic:
for topic in self._load_topics():
if topic.id == topic_id:
return topic
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=TOPIC_NOT_FOUND_MESSAGE,
)
def _get_participant_by_id(self, participant_id: UUID) -> Participant:
for participant in self._load_participants():
if participant.id == participant_id:
return participant
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=PARTICIPANT_NOT_FOUND_MESSAGE,
)
def _resolve_id(self, link: Link, expected_table: str) -> UUID:
return link.resolve({expected_table: lambda value: value})
def get_topic(self, topic_link: Link) -> Topic:
topic_id = self._resolve_id(topic_link, TABLE_TOPICS)
return self._get_topic_by_id(topic_id)
def get_participant(self, participant_link: Link) -> Participant:
participant_id = self._resolve_id(participant_link, TABLE_PARTICIPANTS)
return self._get_participant_by_id(participant_id)
def list_messages(self) -> List[Message]:
messages: List[Message] = []
for line in self.file_manager.read_lines():
try:
messages.append(MessageLineParser.parse(line))
except ValueError:
continue
return messages
def list_messages_by_topic(self, topic_id: Link) -> List[Message]:
topic_value = self._resolve_id(topic_id, TABLE_TOPICS)
messages = [message for message in self.list_messages() if message.topic_id.value == topic_value]
return sorted(messages, key=lambda message: message.order_in_topic)
def create_message(self, payload: MessageCreate) -> Message:
forbidden_word = self._check_forbidden_words(payload.content)
if forbidden_word:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"{FORBIDDEN_WORDS_MESSAGE}: '{forbidden_word}'",
)
self.get_topic(payload.topic_id)
self.get_participant(payload.participant_id)
next_order = len(self.list_messages_by_topic(payload.topic_id)) + 1
message = Message(
id=uuid4(),
topic_id=payload.topic_id,
participant_id=payload.participant_id,
content=payload.content,
order_in_topic=next_order,
created_at=datetime.utcnow(),
)
serialized = MessageLineParser.serialize(message)
self.file_manager.append_line(serialized)
return message
def get_message(self, message_id: UUID) -> Message:
for message in self.list_messages():
if message.id == message_id:
return message
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=MESSAGE_NOT_FOUND_MESSAGE,
)
def export_message(self, message_id: UUID) -> str:
message = self.get_message(message_id)
return MessageLineParser.serialize(message)