| import os | |
| import pathlib | |
| from typing import NewType | |
| import pydantic | |
| import player | |
| import message | |
| from pymongo import MongoClient | |
| from pydantic import BaseModel | |
| DATA_COLLECTION_MODE = os.environ.get("DATA_COLLECTION_MODE", "JSONL") | |
| MONGODB_CONNECTION_STRING = os.environ.get("MONGODB_CONNECTION_STRING") | |
| DB_NAME = os.environ.get("MONGODB_NAME") | |
| Model = NewType("Model", BaseModel) | |
| def save(log_object: Model): | |
| collection = get_collection(log_object) | |
| if DATA_COLLECTION_MODE.upper() == "JSONL": | |
| data_dir = pathlib.Path(__file__).parent.parent / "data" | |
| log_file = os.path.join(data_dir, f"{collection}.jsonl") | |
| with open(log_file, "a+") as f: | |
| f.write(log_object.model_dump_json() + "\n") | |
| if DATA_COLLECTION_MODE.upper() == "MONGODB": | |
| client = MongoClient(MONGODB_CONNECTION_STRING) | |
| db = client[DB_NAME] | |
| db[collection].insert_one(log_object.model_dump()) | |
| def get_collection(log_object: Model) -> str: | |
| from game import Game | |
| if isinstance(log_object, message.AgentMessage): | |
| collection = "messages" | |
| elif isinstance(log_object, player.Player): | |
| collection = "players" | |
| elif isinstance(log_object, Game): | |
| collection = "games" | |
| else: | |
| raise ValueError(f"Unknown log object type: {type(log_object)}") | |
| return collection | |