Spaces:
Paused
Paused
| import os | |
| from bson import ObjectId | |
| from dotenv import load_dotenv | |
| from pymongo import MongoClient | |
| from dto.conversation import Conversation, OneShotConversation | |
| load_dotenv() | |
| class ConversationStorage: | |
| def __init__(self, db_uri, db_name, collection_name): | |
| self.client = MongoClient(db_uri) | |
| self.collection = self.client[db_name][collection_name] | |
| def store_document_mongodb(self, conversation): | |
| return self.collection.insert_one(conversation.model_dump()).inserted_id | |
| def get_all(self): | |
| docs = [] | |
| for q in self.collection.find(): | |
| docs.append(Conversation(**q)) | |
| return docs | |
| def get_next(self, offset): | |
| q = self.collection.find().limit(1).skip(offset)[0] | |
| document = Conversation(**q) | |
| print("Got {} stored at ".format(document.name, document.created)) | |
| return document | |
| def get_one(self, oid) -> Conversation: | |
| q = self.collection.find({"_id": oid})[0] | |
| document = Conversation(**q) | |
| print("Got {} stored at ".format(document.name, document.created)) | |
| return document | |
| def count(self): | |
| return self.collection.estimated_document_count() | |
| def update(self, oid, conversation): | |
| self.collection.update_one( | |
| filter={"_id": oid}, | |
| update=conversation.model_dump() | |
| ) | |
| class ConversationStore: | |
| URI = os.environ["DB_CONN_LOG"] | |
| DB_NAME = os.environ["MONGODB_DB_NAME_MSG_LOG"] | |
| COLLECTION_NAME = os.environ["MONGODB_COL_NAME_MSG_LOG"] | |
| storage = ConversationStorage(URI, DB_NAME, COLLECTION_NAME) | |
| def save_content(self, q, a=None, sources: list[str] = None, params: dict[str, str] = None) -> ObjectId: | |
| return self.storage.store_document_mongodb( | |
| Conversation( | |
| conversation=[ | |
| OneShotConversation( | |
| q=q, | |
| a=a, | |
| sources=sources | |
| ) | |
| ], | |
| params=params | |
| ) | |
| ).__str__() | |
| def get_all(self): | |
| return self.storage.get_all() | |
| def get_one(self, offset): | |
| return self.storage.get_next(offset) | |
| def get(self, oid: str) -> Conversation: | |
| return self.storage.get_one(oid) | |
| def count(self): | |
| return self.storage.count() | |
| def update(self, oid, q, a, sources, params): | |
| self.storage.update( | |
| oid, | |
| Conversation( | |
| conversation=[ | |
| OneShotConversation( | |
| q=q, | |
| a=a, | |
| sources=sources | |
| ) | |
| ], | |
| params=params | |
| ) | |
| ) | |