agent1 / DataCollectorAgent /tools /MarketDataDatabaseConnector.py
varun324242's picture
Upload folder using huggingface_hub
06ba83e verified
from agency_swarm.tools import BaseTool
from pydantic import Field
import os
import psycopg2 # For PostgreSQL
import pymongo # For MongoDB
# Define connection details for different databases
POSTGRESQL_DB_URL = os.getenv("POSTGRESQL_DB_URL")
MONGODB_URI = os.getenv("MONGODB_URI")
class MarketDataDatabaseConnector(BaseTool):
"""
This tool connects to various databases to fetch market data.
It supports different types of databases such as SQL and NoSQL.
The tool handles authentication, data queries, and error handling for each database type.
"""
db_type: str = Field(
..., description="The type of database to connect to (e.g., 'postgresql', 'mongodb')."
)
query: str = Field(
..., description="The query to execute on the database."
)
def run(self):
"""
Connects to the specified database and executes the provided query.
Handles authentication and error handling for each database type.
"""
if self.db_type == "postgresql":
return self._fetch_from_postgresql()
elif self.db_type == "mongodb":
return self._fetch_from_mongodb()
else:
return "Unsupported database type."
def _fetch_from_postgresql(self):
try:
# Connect to PostgreSQL database
conn = psycopg2.connect(POSTGRESQL_DB_URL)
cursor = conn.cursor()
cursor.execute(self.query)
result = cursor.fetchall()
cursor.close()
conn.close()
return result
except psycopg2.DatabaseError as e:
return f"Error fetching data from PostgreSQL: {e}"
def _fetch_from_mongodb(self):
try:
# Connect to MongoDB
client = pymongo.MongoClient(MONGODB_URI)
db = client.get_default_database()
# Assuming the query is a JSON-like dictionary for MongoDB
collection_name, query_filter = self.query.split(":", 1)
collection = db[collection_name.strip()]
result = list(collection.find(eval(query_filter.strip())))
client.close()
return result
except pymongo.errors.PyMongoError as e:
return f"Error fetching data from MongoDB: {e}"