Spaces:
Build error
Build error
| from typing import Dict, List, Optional | |
| from app.services.cache import MessageCache | |
| from app.services.chat_manager import ChatManager | |
| from app.handlers.media_handler import MediaHandler | |
| from app.services.message_parser import MessageParser | |
| from app.services.download_media import download_whatsapp_media | |
| from app.services.message import process_message_with_llm | |
| from app.models.message_types import Message, MediaType, MediaContent | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| class MessageHandler: | |
| def __init__( | |
| self, | |
| message_cache: MessageCache, | |
| chat_manager: ChatManager, | |
| media_handler: MediaHandler, | |
| logger: logging.Logger | |
| ): | |
| self.message_cache = message_cache | |
| self.chat_manager = chat_manager | |
| self.media_handler = media_handler | |
| self.logger = logger | |
| async def handle(self, raw_message: dict, access_token: str) -> dict: | |
| try: | |
| # Parse message | |
| message = MessageParser.parse(raw_message) | |
| if self.message_cache.exists(message.id): | |
| self.logger.info(f"Duplicate message detected and skipped: {message.id}") | |
| return {"status": "duplicate", "message_id": message.id} | |
| # Download media | |
| media_paths = await self._process_media(message, access_token) | |
| self.chat_manager.initialize_chat(message.sender_id) | |
| # Process message with LLM | |
| result = await process_message_with_llm( | |
| message.sender_id, | |
| message.content, | |
| self.chat_manager.get_chat_history(message.sender_id), | |
| **media_paths | |
| ) | |
| # Append message to chat to keep track of conversation | |
| self.chat_manager.append_message(message.sender_id, "user", message.content) | |
| self.chat_manager.append_message(message.sender_id, "model", result) | |
| self.message_cache.add(message.id) | |
| return {"status": "success", "message_id": message.id, "result": result} | |
| except Exception as e: | |
| return {"status": "error", "message_id": raw_message.get("id"), "error": str(e)} | |
| async def _process_media(self, message: Message, access_token: str) -> Dict[str, Optional[str]]: | |
| media_paths = { | |
| "image_file_path": None, | |
| "doc_path": None, | |
| "video_file_path": None | |
| } | |
| if not message.media: | |
| return media_paths | |
| for media_type, content in message.media.items(): | |
| self.logger.info(f"Processing {media_type.value}: {content.file_path}") | |
| file_path = await self.media_handler.download( | |
| content.id, | |
| access_token, | |
| content.file_path | |
| ) | |
| self.logger.info(f"{media_type.value} file_path: {file_path}") | |
| if media_type == MediaType.IMAGE: | |
| media_paths["image_file_path"] = file_path | |
| elif media_type == MediaType.DOCUMENT: | |
| media_paths["doc_path"] = file_path | |
| elif media_type == MediaType.VIDEO: | |
| media_paths["video_file_path"] = file_path | |
| return media_paths | |