Spaces:
Sleeping
Sleeping
| # backend/src/services/connectors/base.py | |
| from abc import ABC, abstractmethod | |
| from typing import List, Dict, Any, Optional | |
| class NoSQLConnector(ABC): | |
| """ | |
| Abstract Base Class for Universal NoSQL Connectivity. | |
| Any database (Mongo, DynamoDB, Firebase) must implement these methods. | |
| """ | |
| def connect(self): | |
| """Establish connection to the database.""" | |
| pass | |
| def disconnect(self): | |
| """Close the connection.""" | |
| pass | |
| def get_schema_summary(self) -> str: | |
| """ | |
| Returns a string description of collections and fields. | |
| Crucial for the LLM to understand what to query. | |
| """ | |
| pass | |
| def find_one(self, collection: str, query: Dict[str, Any]) -> Optional[Dict[str, Any]]: | |
| """Retrieve a single document matching the query.""" | |
| pass | |
| def find_many(self, collection: str, query: Dict[str, Any], limit: int = 5) -> List[Dict[str, Any]]: | |
| """Retrieve multiple documents matching the query.""" | |
| pass |