import os import psycopg2 import pandas as pd import io from dotenv import load_dotenv import logging import sys from datetime import datetime import json # Configure logging to stdout only logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(sys.stdout) ] ) logger = logging.getLogger(__name__) # Load environment variables load_dotenv() # Database configuration DATABASE_URL = os.environ.get('DATABASE_URL') def get_db_connection(): """ Create a connection to the PostgreSQL database Returns: Connection: The database connection """ try: conn = psycopg2.connect(DATABASE_URL) return conn except Exception as e: logger.error(f"Error connecting to database: {str(e)}") raise def init_db(): """ Initialize the database with required tables """ conn = None try: conn = get_db_connection() cursor = conn.cursor() # Create files table to store file contents cursor.execute(''' CREATE TABLE IF NOT EXISTS files ( id SERIAL PRIMARY KEY, filename TEXT NOT NULL, file_type TEXT NOT NULL, content BYTEA NOT NULL, created_at TIMESTAMP NOT NULL DEFAULT NOW(), metadata JSONB ) ''') conn.commit() logger.info("Database initialized successfully") except Exception as e: logger.error(f"Error initializing database: {str(e)}") if conn: conn.rollback() raise finally: if conn: conn.close() def save_data_to_db(filename, content, file_type, metadata=None): """ Save data directly to the database Args: filename: Name of the file content: Binary content of the file file_type: Type of file (e.g., 'preprocessed', 'categorized') metadata: Additional metadata to store Returns: int: ID of the saved file """ conn = None try: # Create metadata JSON if it doesn't exist if metadata is None: metadata = {} # Add timestamp to metadata metadata['saved_at'] = datetime.now().isoformat() # Connect to database and save file conn = get_db_connection() cursor = conn.cursor() cursor.execute( ''' INSERT INTO files (filename, file_type, content, metadata) VALUES (%s, %s, %s, %s) RETURNING id ''', (filename, file_type, psycopg2.Binary(content), json.dumps(metadata)) ) file_id = cursor.fetchone()[0] conn.commit() logger.info(f"File {filename} saved to database with ID {file_id}") return file_id except Exception as e: logger.error(f"Error saving file to database: {str(e)}") if conn: conn.rollback() raise finally: if conn: conn.close() def get_file_from_db(file_id=None, file_type=None, latest=True): """ Retrieve a file from the database by ID or type Args: file_id: ID of the file to retrieve file_type: Type of file to retrieve (used if file_id is None) latest: If True, get the most recent file of this type (used if file_id is None) Returns: dict: Dictionary with file data including filename, content, and metadata """ conn = None try: conn = get_db_connection() cursor = conn.cursor() if file_id is not None: cursor.execute( ''' SELECT id, filename, content, file_type, metadata FROM files WHERE id = %s ''', (file_id,) ) elif file_type is not None: # Get the most recent file of the specified type if latest: cursor.execute( ''' SELECT id, filename, content, file_type, metadata FROM files WHERE file_type = %s ORDER BY created_at DESC LIMIT 1 ''', (file_type,) ) else: cursor.execute( ''' SELECT id, filename, content, file_type, metadata FROM files WHERE file_type = %s ''', (file_type,) ) else: logger.warning("Neither file_id nor file_type provided for database query") return None result = cursor.fetchone() if result: file_id, filename, content, file_type, metadata = result # Process metadata properly if metadata: # If metadata is already a dict, use it directly if isinstance(metadata, dict): metadata_dict = metadata else: # Otherwise, try to parse it as JSON try: metadata_dict = json.loads(metadata) except (TypeError, ValueError): logger.warning(f"Failed to parse metadata for file {file_id}. Using empty dict.") metadata_dict = {} else: metadata_dict = {} file_data = { 'id': file_id, 'filename': filename, 'content': content, 'file_type': file_type, 'metadata': metadata_dict } logger.info(f"File {filename} (ID: {file_id}) retrieved from database") return file_data else: if file_id is not None: logger.warning(f"No file found with ID: {file_id}") else: logger.warning(f"No files found with type: {file_type}") return None except Exception as e: logger.error(f"Error retrieving file from database: {str(e)}") raise finally: if conn: conn.close() def get_dataframe_from_db(file_id=None, file_type=None, latest=True): """ Retrieve a dataframe from the database Args: file_id: ID of the file to retrieve file_type: Type of file to retrieve (used if file_id is None) latest: If True, get the most recent file of this type (used if file_id is None) Returns: DataFrame: pandas DataFrame from the retrieved file """ file_data = get_file_from_db(file_id, file_type, latest) if file_data and 'content' in file_data: # Convert the binary data to a pandas DataFrame content = file_data['content'] df = pd.read_csv(io.BytesIO(content)) return df return None # Specific functions for different file types def save_processed_data(filename, content, file_type='preprocessed'): """Save processed data directly to database""" return save_data_to_db(filename, content, file_type) def save_categorized_data(filename, content, file_type='categorized'): """Save categorized data directly to database""" return save_data_to_db(filename, content, file_type) def save_analysis_results(niche_data, subniche_data): """Save analysis results directly to database""" niche_id = save_data_to_db("Niche_Ranking_Analysis.csv", niche_data, 'niche_analysis') subniche_id = save_data_to_db("Subniche_Analysis.csv", subniche_data, 'subniche_analysis') return niche_id, subniche_id def save_generated_titles(filename, content): """Save generated titles directly to database""" return save_data_to_db(filename, content, 'generated')