Spaces:
Sleeping
Sleeping
File size: 1,112 Bytes
ba2fc46 370480b | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 | # backend/src/services/connectors/base.py
from abc import ABC, abstractmethod
from typing import List, Dict, Any, Optional
class NoSQLConnector(ABC):
"""
Abstract Base Class for Universal NoSQL Connectivity.
Any database (Mongo, DynamoDB, Firebase) must implement these methods.
"""
@abstractmethod
def connect(self):
"""Establish connection to the database."""
pass
@abstractmethod
def disconnect(self):
"""Close the connection."""
pass
@abstractmethod
def get_schema_summary(self) -> str:
"""
Returns a string description of collections and fields.
Crucial for the LLM to understand what to query.
"""
pass
@abstractmethod
def find_one(self, collection: str, query: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""Retrieve a single document matching the query."""
pass
@abstractmethod
def find_many(self, collection: str, query: Dict[str, Any], limit: int = 5) -> List[Dict[str, Any]]:
"""Retrieve multiple documents matching the query."""
pass |