trykopy / conversation /conversation_store.py
Pavol Liška
cleanup requirements, fix api
b11dd45
raw
history blame
2.74 kB
import os
from bson import ObjectId
from dotenv import load_dotenv
from pymongo import MongoClient
from dto.conversation import Conversation, OneShotConversation
load_dotenv()
class ConversationStorage:
def __init__(self, db_uri, db_name, collection_name):
self.client = MongoClient(db_uri)
self.collection = self.client[db_name][collection_name]
def store_document_mongodb(self, conversation):
return self.collection.insert_one(conversation.model_dump()).inserted_id
def get_all(self):
docs = []
for q in self.collection.find():
docs.append(Conversation(**q))
return docs
def get_next(self, offset):
q = self.collection.find().limit(1).skip(offset)[0]
document = Conversation(**q)
print("Got {} stored at ".format(document.name, document.created))
return document
def get_one(self, oid) -> Conversation:
q = self.collection.find({"_id": oid})[0]
document = Conversation(**q)
print("Got {} stored at ".format(document.name, document.created))
return document
def count(self):
return self.collection.estimated_document_count()
def update(self, oid, conversation):
self.collection.update_one(
filter={"_id": oid},
update=conversation.model_dump()
)
class ConversationStore:
URI = os.environ["DB_CONN_LOG"]
DB_NAME = os.environ["MONGODB_DB_NAME_MSG_LOG"]
COLLECTION_NAME = os.environ["MONGODB_COL_NAME_MSG_LOG"]
storage = ConversationStorage(URI, DB_NAME, COLLECTION_NAME)
def save_content(self, q, a=None, sources: list[str] = None, params: dict[str, str] = None) -> ObjectId:
return self.storage.store_document_mongodb(
Conversation(
conversation=[
OneShotConversation(
q=q,
a=a,
sources=sources
)
],
params=params
)
).__str__()
def get_all(self):
return self.storage.get_all()
def get_one(self, offset):
return self.storage.get_next(offset)
def get(self, oid: str) -> Conversation:
return self.storage.get_one(oid)
def count(self):
return self.storage.count()
def update(self, oid, q, a, sources, params):
self.storage.update(
oid,
Conversation(
conversation=[
OneShotConversation(
q=q,
a=a,
sources=sources
)
],
params=params
)
)