AI-Agent-Book / utils /db_manager.py
Cuong2004's picture
init project
ded29b0
import os
import psycopg2
import pandas as pd
import io
from dotenv import load_dotenv
import logging
import sys
from datetime import datetime
import json
# Configure logging to stdout only
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger(__name__)
# Load environment variables
load_dotenv()
# Database configuration
DATABASE_URL = os.environ.get('DATABASE_URL')
def get_db_connection():
"""
Create a connection to the PostgreSQL database
Returns:
Connection: The database connection
"""
try:
conn = psycopg2.connect(DATABASE_URL)
return conn
except Exception as e:
logger.error(f"Error connecting to database: {str(e)}")
raise
def init_db():
"""
Initialize the database with required tables
"""
conn = None
try:
conn = get_db_connection()
cursor = conn.cursor()
# Create files table to store file contents
cursor.execute('''
CREATE TABLE IF NOT EXISTS files (
id SERIAL PRIMARY KEY,
filename TEXT NOT NULL,
file_type TEXT NOT NULL,
content BYTEA NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
metadata JSONB
)
''')
conn.commit()
logger.info("Database initialized successfully")
except Exception as e:
logger.error(f"Error initializing database: {str(e)}")
if conn:
conn.rollback()
raise
finally:
if conn:
conn.close()
def save_data_to_db(filename, content, file_type, metadata=None):
"""
Save data directly to the database
Args:
filename: Name of the file
content: Binary content of the file
file_type: Type of file (e.g., 'preprocessed', 'categorized')
metadata: Additional metadata to store
Returns:
int: ID of the saved file
"""
conn = None
try:
# Create metadata JSON if it doesn't exist
if metadata is None:
metadata = {}
# Add timestamp to metadata
metadata['saved_at'] = datetime.now().isoformat()
# Connect to database and save file
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute(
'''
INSERT INTO files (filename, file_type, content, metadata)
VALUES (%s, %s, %s, %s)
RETURNING id
''',
(filename, file_type, psycopg2.Binary(content), json.dumps(metadata))
)
file_id = cursor.fetchone()[0]
conn.commit()
logger.info(f"File {filename} saved to database with ID {file_id}")
return file_id
except Exception as e:
logger.error(f"Error saving file to database: {str(e)}")
if conn:
conn.rollback()
raise
finally:
if conn:
conn.close()
def get_file_from_db(file_id=None, file_type=None, latest=True):
"""
Retrieve a file from the database by ID or type
Args:
file_id: ID of the file to retrieve
file_type: Type of file to retrieve (used if file_id is None)
latest: If True, get the most recent file of this type (used if file_id is None)
Returns:
dict: Dictionary with file data including filename, content, and metadata
"""
conn = None
try:
conn = get_db_connection()
cursor = conn.cursor()
if file_id is not None:
cursor.execute(
'''
SELECT id, filename, content, file_type, metadata
FROM files
WHERE id = %s
''',
(file_id,)
)
elif file_type is not None:
# Get the most recent file of the specified type
if latest:
cursor.execute(
'''
SELECT id, filename, content, file_type, metadata
FROM files
WHERE file_type = %s
ORDER BY created_at DESC
LIMIT 1
''',
(file_type,)
)
else:
cursor.execute(
'''
SELECT id, filename, content, file_type, metadata
FROM files
WHERE file_type = %s
''',
(file_type,)
)
else:
logger.warning("Neither file_id nor file_type provided for database query")
return None
result = cursor.fetchone()
if result:
file_id, filename, content, file_type, metadata = result
# Process metadata properly
if metadata:
# If metadata is already a dict, use it directly
if isinstance(metadata, dict):
metadata_dict = metadata
else:
# Otherwise, try to parse it as JSON
try:
metadata_dict = json.loads(metadata)
except (TypeError, ValueError):
logger.warning(f"Failed to parse metadata for file {file_id}. Using empty dict.")
metadata_dict = {}
else:
metadata_dict = {}
file_data = {
'id': file_id,
'filename': filename,
'content': content,
'file_type': file_type,
'metadata': metadata_dict
}
logger.info(f"File {filename} (ID: {file_id}) retrieved from database")
return file_data
else:
if file_id is not None:
logger.warning(f"No file found with ID: {file_id}")
else:
logger.warning(f"No files found with type: {file_type}")
return None
except Exception as e:
logger.error(f"Error retrieving file from database: {str(e)}")
raise
finally:
if conn:
conn.close()
def get_dataframe_from_db(file_id=None, file_type=None, latest=True):
"""
Retrieve a dataframe from the database
Args:
file_id: ID of the file to retrieve
file_type: Type of file to retrieve (used if file_id is None)
latest: If True, get the most recent file of this type (used if file_id is None)
Returns:
DataFrame: pandas DataFrame from the retrieved file
"""
file_data = get_file_from_db(file_id, file_type, latest)
if file_data and 'content' in file_data:
# Convert the binary data to a pandas DataFrame
content = file_data['content']
df = pd.read_csv(io.BytesIO(content))
return df
return None
# Specific functions for different file types
def save_processed_data(filename, content, file_type='preprocessed'):
"""Save processed data directly to database"""
return save_data_to_db(filename, content, file_type)
def save_categorized_data(filename, content, file_type='categorized'):
"""Save categorized data directly to database"""
return save_data_to_db(filename, content, file_type)
def save_analysis_results(niche_data, subniche_data):
"""Save analysis results directly to database"""
niche_id = save_data_to_db("Niche_Ranking_Analysis.csv", niche_data, 'niche_analysis')
subniche_id = save_data_to_db("Subniche_Analysis.csv", subniche_data, 'subniche_analysis')
return niche_id, subniche_id
def save_generated_titles(filename, content):
"""Save generated titles directly to database"""
return save_data_to_db(filename, content, 'generated')