# import os # import psycopg2 # from psycopg2.extras import RealDictCursor # from dotenv import load_dotenv # # Load environment variables # load_dotenv() # # Database Configuration # DB_HOST = os.getenv("DB_HOST") # DB_NAME = os.getenv("DB_NAME") # DB_USER = os.getenv("DB_USER") # DB_PASSWORD = os.getenv("DB_PASSWORD") # DB_PORT = os.getenv("DB_PORT") # def connect_db(): # """Connects to PostgreSQL database.""" # try: # conn = psycopg2.connect( # host=DB_HOST, # database=DB_NAME, # user=DB_USER, # password=DB_PASSWORD, # port=DB_PORT, # cursor_factory=RealDictCursor # ) # return conn # except Exception as e: # print("DB conn error") # return f"⚠️ Database connection error: {str(e)}" # def execute_sql_query(sql_query: str): # """ # Executes a SQL query on the Supabase PostgreSQL database. # Args: # sql_query (str): The SQL query to execute. # Returns: # list or str: Query results or an error message. # """ # conn = connect_db() # print(conn) # if isinstance(conn, str): # Error message from connect_db # return conn # try: # with conn.cursor() as cur: # print(f"Executing SQL Query: {sql_query}") # Debugging # cur.execute(sql_query) # result = cur.fetchall() # print(f"Query Result: {result}") # Debugging # conn.close() # return result if result else "🔍 No data found." # except Exception as e: # conn.close() # return f"⚠️ Database error for query '{sql_query}': {str(e)}" import os import psycopg2 from psycopg2.extras import RealDictCursor from dotenv import load_dotenv # Load environment variables load_dotenv() # Database Configuration DB_HOST = os.getenv("DB_HOST") DB_NAME = os.getenv("DB_NAME") DB_USER = os.getenv("DB_USER") DB_PASSWORD = os.getenv("DB_PASSWORD") DB_PORT = os.getenv("DB_PORT") class DatabaseConnection: """Singleton Database Connection for PostgreSQL""" _instance = None def __new__(cls): if cls._instance is None: cls._instance = super(DatabaseConnection, cls).__new__(cls) cls._instance.connection = cls._create_connection() return cls._instance @staticmethod def _create_connection(): try: return psycopg2.connect( host=DB_HOST, database=DB_NAME, user=DB_USER, password=DB_PASSWORD, port=DB_PORT, cursor_factory=RealDictCursor ) except Exception as e: print(f"Database connection error: {e}") return None def get_connection(self): """Returns the active database connection""" return self.connection def close_connection(self): """Closes the database connection""" if self.connection: self.connection.close() print("Database connection closed.") def sql_query_agent(query: str): """ Executes a SQL query on the Supabase PostgreSQL database. Args: query (str): The SQL query to execute. Returns: str: Formatted results or error message. """ print(f"Executing SQL Query: {query}") try: db_instance = DatabaseConnection() conn = db_instance.get_connection() if conn: with conn.cursor() as cur: cur.execute(query) results = cur.fetchall() if not results: return "🔍 No data found." return f"Here are the results:\n{str(results)}" else: raise Exception("Database connection error") except Exception as e: print(f"Error querying database: {e}") return "⚠️ An error occurred. Please try again later."