notebook-backend / utils /spaces_manager.py
mohhhhhit's picture
Update utils/spaces_manager.py
cdec65b verified
import os
import uuid
from typing import List, Dict, Optional
from datetime import datetime
from pymongo import MongoClient
class SpacesManager:
"""Manages creation and retrieval of Spaces using MongoDB Atlas."""
def __init__(self):
self.mongo_uri = os.getenv("MONGO_URI")
if self.mongo_uri:
self.client = MongoClient(self.mongo_uri)
self.db = self.client["notebookpro_db"]
self.collection = self.db["spaces"]
else:
print("WARNING: MONGO_URI not set. SpacesManager will fail.")
self.collection = None
def get_all_spaces(self) -> List[Dict]:
"""Retrieve all spaces from MongoDB."""
if self.collection is None: return []
# Return all documents, but exclude the internal MongoDB '_id' field
return list(self.collection.find({}, {"_id": 0}))
def get_space(self, space_id: str) -> Optional[Dict]:
"""Retrieve a single space by ID."""
if self.collection is None: return None
return self.collection.find_one({"id": space_id}, {"_id": 0})
def create_space(self, name: str) -> Dict:
"""Create a new space and save it to MongoDB."""
# Generate a clean ID from the name
base_id = "".join(c for c in name.lower() if c.isalnum() or c == ' ').replace(' ', '_')
space_id = base_id
# Ensure ID is unique
while self.collection.find_one({"id": space_id}):
space_id = f"{base_id}_{uuid.uuid4().hex[:4]}"
space_data = {
"id": space_id,
"name": name,
"created_at": datetime.now().isoformat()
}
self.collection.insert_one(space_data)
# Remove the injected _id before returning to FastAPI
space_data.pop('_id', None)
return space_data
def delete_space(self, space_id: str):
"""Delete a space from MongoDB."""
if self.collection is not None:
self.collection.delete_one({"id": space_id})