Spaces:
Runtime error
Runtime error
| from agency_swarm.tools import BaseTool | |
| from pydantic import Field | |
| import os | |
| import psycopg2 # For PostgreSQL | |
| import pymongo # For MongoDB | |
| # Define connection details for different databases | |
| POSTGRESQL_DB_URL = os.getenv("POSTGRESQL_DB_URL") | |
| MONGODB_URI = os.getenv("MONGODB_URI") | |
| class MarketDataDatabaseConnector(BaseTool): | |
| """ | |
| This tool connects to various databases to fetch market data. | |
| It supports different types of databases such as SQL and NoSQL. | |
| The tool handles authentication, data queries, and error handling for each database type. | |
| """ | |
| db_type: str = Field( | |
| ..., description="The type of database to connect to (e.g., 'postgresql', 'mongodb')." | |
| ) | |
| query: str = Field( | |
| ..., description="The query to execute on the database." | |
| ) | |
| def run(self): | |
| """ | |
| Connects to the specified database and executes the provided query. | |
| Handles authentication and error handling for each database type. | |
| """ | |
| if self.db_type == "postgresql": | |
| return self._fetch_from_postgresql() | |
| elif self.db_type == "mongodb": | |
| return self._fetch_from_mongodb() | |
| else: | |
| return "Unsupported database type." | |
| def _fetch_from_postgresql(self): | |
| try: | |
| # Connect to PostgreSQL database | |
| conn = psycopg2.connect(POSTGRESQL_DB_URL) | |
| cursor = conn.cursor() | |
| cursor.execute(self.query) | |
| result = cursor.fetchall() | |
| cursor.close() | |
| conn.close() | |
| return result | |
| except psycopg2.DatabaseError as e: | |
| return f"Error fetching data from PostgreSQL: {e}" | |
| def _fetch_from_mongodb(self): | |
| try: | |
| # Connect to MongoDB | |
| client = pymongo.MongoClient(MONGODB_URI) | |
| db = client.get_default_database() | |
| # Assuming the query is a JSON-like dictionary for MongoDB | |
| collection_name, query_filter = self.query.split(":", 1) | |
| collection = db[collection_name.strip()] | |
| result = list(collection.find(eval(query_filter.strip()))) | |
| client.close() | |
| return result | |
| except pymongo.errors.PyMongoError as e: | |
| return f"Error fetching data from MongoDB: {e}" |