Spaces:
Sleeping
Sleeping
| import os | |
| import psycopg2 | |
| import pandas as pd | |
| import io | |
| from dotenv import load_dotenv | |
| import logging | |
| import sys | |
| from datetime import datetime | |
| import json | |
| # Configure logging to stdout only | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.StreamHandler(sys.stdout) | |
| ] | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Load environment variables | |
| load_dotenv() | |
| # Database configuration | |
| DATABASE_URL = os.environ.get('DATABASE_URL') | |
| def get_db_connection(): | |
| """ | |
| Create a connection to the PostgreSQL database | |
| Returns: | |
| Connection: The database connection | |
| """ | |
| try: | |
| conn = psycopg2.connect(DATABASE_URL) | |
| return conn | |
| except Exception as e: | |
| logger.error(f"Error connecting to database: {str(e)}") | |
| raise | |
| def init_db(): | |
| """ | |
| Initialize the database with required tables | |
| """ | |
| conn = None | |
| try: | |
| conn = get_db_connection() | |
| cursor = conn.cursor() | |
| # Create files table to store file contents | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS files ( | |
| id SERIAL PRIMARY KEY, | |
| filename TEXT NOT NULL, | |
| file_type TEXT NOT NULL, | |
| content BYTEA NOT NULL, | |
| created_at TIMESTAMP NOT NULL DEFAULT NOW(), | |
| metadata JSONB | |
| ) | |
| ''') | |
| conn.commit() | |
| logger.info("Database initialized successfully") | |
| except Exception as e: | |
| logger.error(f"Error initializing database: {str(e)}") | |
| if conn: | |
| conn.rollback() | |
| raise | |
| finally: | |
| if conn: | |
| conn.close() | |
| def save_data_to_db(filename, content, file_type, metadata=None): | |
| """ | |
| Save data directly to the database | |
| Args: | |
| filename: Name of the file | |
| content: Binary content of the file | |
| file_type: Type of file (e.g., 'preprocessed', 'categorized') | |
| metadata: Additional metadata to store | |
| Returns: | |
| int: ID of the saved file | |
| """ | |
| conn = None | |
| try: | |
| # Create metadata JSON if it doesn't exist | |
| if metadata is None: | |
| metadata = {} | |
| # Add timestamp to metadata | |
| metadata['saved_at'] = datetime.now().isoformat() | |
| # Connect to database and save file | |
| conn = get_db_connection() | |
| cursor = conn.cursor() | |
| cursor.execute( | |
| ''' | |
| INSERT INTO files (filename, file_type, content, metadata) | |
| VALUES (%s, %s, %s, %s) | |
| RETURNING id | |
| ''', | |
| (filename, file_type, psycopg2.Binary(content), json.dumps(metadata)) | |
| ) | |
| file_id = cursor.fetchone()[0] | |
| conn.commit() | |
| logger.info(f"File {filename} saved to database with ID {file_id}") | |
| return file_id | |
| except Exception as e: | |
| logger.error(f"Error saving file to database: {str(e)}") | |
| if conn: | |
| conn.rollback() | |
| raise | |
| finally: | |
| if conn: | |
| conn.close() | |
| def get_file_from_db(file_id=None, file_type=None, latest=True): | |
| """ | |
| Retrieve a file from the database by ID or type | |
| Args: | |
| file_id: ID of the file to retrieve | |
| file_type: Type of file to retrieve (used if file_id is None) | |
| latest: If True, get the most recent file of this type (used if file_id is None) | |
| Returns: | |
| dict: Dictionary with file data including filename, content, and metadata | |
| """ | |
| conn = None | |
| try: | |
| conn = get_db_connection() | |
| cursor = conn.cursor() | |
| if file_id is not None: | |
| cursor.execute( | |
| ''' | |
| SELECT id, filename, content, file_type, metadata | |
| FROM files | |
| WHERE id = %s | |
| ''', | |
| (file_id,) | |
| ) | |
| elif file_type is not None: | |
| # Get the most recent file of the specified type | |
| if latest: | |
| cursor.execute( | |
| ''' | |
| SELECT id, filename, content, file_type, metadata | |
| FROM files | |
| WHERE file_type = %s | |
| ORDER BY created_at DESC | |
| LIMIT 1 | |
| ''', | |
| (file_type,) | |
| ) | |
| else: | |
| cursor.execute( | |
| ''' | |
| SELECT id, filename, content, file_type, metadata | |
| FROM files | |
| WHERE file_type = %s | |
| ''', | |
| (file_type,) | |
| ) | |
| else: | |
| logger.warning("Neither file_id nor file_type provided for database query") | |
| return None | |
| result = cursor.fetchone() | |
| if result: | |
| file_id, filename, content, file_type, metadata = result | |
| # Process metadata properly | |
| if metadata: | |
| # If metadata is already a dict, use it directly | |
| if isinstance(metadata, dict): | |
| metadata_dict = metadata | |
| else: | |
| # Otherwise, try to parse it as JSON | |
| try: | |
| metadata_dict = json.loads(metadata) | |
| except (TypeError, ValueError): | |
| logger.warning(f"Failed to parse metadata for file {file_id}. Using empty dict.") | |
| metadata_dict = {} | |
| else: | |
| metadata_dict = {} | |
| file_data = { | |
| 'id': file_id, | |
| 'filename': filename, | |
| 'content': content, | |
| 'file_type': file_type, | |
| 'metadata': metadata_dict | |
| } | |
| logger.info(f"File {filename} (ID: {file_id}) retrieved from database") | |
| return file_data | |
| else: | |
| if file_id is not None: | |
| logger.warning(f"No file found with ID: {file_id}") | |
| else: | |
| logger.warning(f"No files found with type: {file_type}") | |
| return None | |
| except Exception as e: | |
| logger.error(f"Error retrieving file from database: {str(e)}") | |
| raise | |
| finally: | |
| if conn: | |
| conn.close() | |
| def get_dataframe_from_db(file_id=None, file_type=None, latest=True): | |
| """ | |
| Retrieve a dataframe from the database | |
| Args: | |
| file_id: ID of the file to retrieve | |
| file_type: Type of file to retrieve (used if file_id is None) | |
| latest: If True, get the most recent file of this type (used if file_id is None) | |
| Returns: | |
| DataFrame: pandas DataFrame from the retrieved file | |
| """ | |
| file_data = get_file_from_db(file_id, file_type, latest) | |
| if file_data and 'content' in file_data: | |
| # Convert the binary data to a pandas DataFrame | |
| content = file_data['content'] | |
| df = pd.read_csv(io.BytesIO(content)) | |
| return df | |
| return None | |
| # Specific functions for different file types | |
| def save_processed_data(filename, content, file_type='preprocessed'): | |
| """Save processed data directly to database""" | |
| return save_data_to_db(filename, content, file_type) | |
| def save_categorized_data(filename, content, file_type='categorized'): | |
| """Save categorized data directly to database""" | |
| return save_data_to_db(filename, content, file_type) | |
| def save_analysis_results(niche_data, subniche_data): | |
| """Save analysis results directly to database""" | |
| niche_id = save_data_to_db("Niche_Ranking_Analysis.csv", niche_data, 'niche_analysis') | |
| subniche_id = save_data_to_db("Subniche_Analysis.csv", subniche_data, 'subniche_analysis') | |
| return niche_id, subniche_id | |
| def save_generated_titles(filename, content): | |
| """Save generated titles directly to database""" | |
| return save_data_to_db(filename, content, 'generated') |