import os from bson import ObjectId from dotenv import load_dotenv from pymongo import MongoClient from dto.conversation import Conversation, OneShotConversation load_dotenv() class ConversationStorage: def __init__(self, db_uri, db_name, collection_name): self.client = MongoClient(db_uri) self.collection = self.client[db_name][collection_name] def store_document_mongodb(self, conversation): return self.collection.insert_one(conversation.model_dump()).inserted_id def get_all(self): docs = [] for q in self.collection.find(): docs.append(Conversation(**q)) return docs def get_next(self, offset): q = self.collection.find().limit(1).skip(offset)[0] document = Conversation(**q) print("Got {} stored at ".format(document.name, document.created)) return document def get_one(self, oid) -> Conversation: q = self.collection.find({"_id": oid})[0] document = Conversation(**q) print("Got {} stored at ".format(document.name, document.created)) return document def count(self): return self.collection.estimated_document_count() def update(self, oid, conversation): self.collection.update_one( filter={"_id": oid}, update=conversation.model_dump() ) class ConversationStore: URI = os.environ["DB_CONN_LOG"] DB_NAME = os.environ["MONGODB_DB_NAME_MSG_LOG"] COLLECTION_NAME = os.environ["MONGODB_COL_NAME_MSG_LOG"] storage = ConversationStorage(URI, DB_NAME, COLLECTION_NAME) def save_content(self, q, a=None, sources: list[str] = None, params: dict[str, str] = None) -> ObjectId: return self.storage.store_document_mongodb( Conversation( conversation=[ OneShotConversation( q=q, a=a, sources=sources ) ], params=params ) ).__str__() def get_all(self): return self.storage.get_all() def get_one(self, offset): return self.storage.get_next(offset) def get(self, oid: str) -> Conversation: return self.storage.get_one(oid) def count(self): return self.storage.count() def update(self, oid, q, a, sources, params): self.storage.update( oid, Conversation( conversation=[ OneShotConversation( q=q, a=a, sources=sources ) ], params=params ) )