File size: 8,031 Bytes
ded29b0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
import os
import psycopg2
import pandas as pd
import io
from dotenv import load_dotenv
import logging
import sys
from datetime import datetime
import json

# Configure logging to stdout only
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.StreamHandler(sys.stdout)
    ]
)
logger = logging.getLogger(__name__)

# Load environment variables
load_dotenv()

# Database configuration
DATABASE_URL = os.environ.get('DATABASE_URL')

def get_db_connection():
    """
    Create a connection to the PostgreSQL database
    
    Returns:
        Connection: The database connection
    """
    try:
        conn = psycopg2.connect(DATABASE_URL)
        return conn
    except Exception as e:
        logger.error(f"Error connecting to database: {str(e)}")
        raise

def init_db():
    """
    Initialize the database with required tables
    """
    conn = None
    try:
        conn = get_db_connection()
        cursor = conn.cursor()
        
        # Create files table to store file contents
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS files (
                id SERIAL PRIMARY KEY,
                filename TEXT NOT NULL,
                file_type TEXT NOT NULL,
                content BYTEA NOT NULL,
                created_at TIMESTAMP NOT NULL DEFAULT NOW(),
                metadata JSONB
            )
        ''')
        
        conn.commit()
        logger.info("Database initialized successfully")
    except Exception as e:
        logger.error(f"Error initializing database: {str(e)}")
        if conn:
            conn.rollback()
        raise
    finally:
        if conn:
            conn.close()

def save_data_to_db(filename, content, file_type, metadata=None):
    """
    Save data directly to the database
    
    Args:
        filename: Name of the file
        content: Binary content of the file
        file_type: Type of file (e.g., 'preprocessed', 'categorized')
        metadata: Additional metadata to store
        
    Returns:
        int: ID of the saved file
    """
    conn = None
    try:
        # Create metadata JSON if it doesn't exist
        if metadata is None:
            metadata = {}
        
        # Add timestamp to metadata
        metadata['saved_at'] = datetime.now().isoformat()
        
        # Connect to database and save file
        conn = get_db_connection()
        cursor = conn.cursor()
        
        cursor.execute(
            '''
            INSERT INTO files (filename, file_type, content, metadata)
            VALUES (%s, %s, %s, %s)
            RETURNING id
            ''',
            (filename, file_type, psycopg2.Binary(content), json.dumps(metadata))
        )
        
        file_id = cursor.fetchone()[0]
        conn.commit()
        
        logger.info(f"File {filename} saved to database with ID {file_id}")
        return file_id
    except Exception as e:
        logger.error(f"Error saving file to database: {str(e)}")
        if conn:
            conn.rollback()
        raise
    finally:
        if conn:
            conn.close()

def get_file_from_db(file_id=None, file_type=None, latest=True):
    """
    Retrieve a file from the database by ID or type
    
    Args:
        file_id: ID of the file to retrieve
        file_type: Type of file to retrieve (used if file_id is None)
        latest: If True, get the most recent file of this type (used if file_id is None)
        
    Returns:
        dict: Dictionary with file data including filename, content, and metadata
    """
    conn = None
    try:
        conn = get_db_connection()
        cursor = conn.cursor()
        
        if file_id is not None:
            cursor.execute(
                '''
                SELECT id, filename, content, file_type, metadata
                FROM files
                WHERE id = %s
                ''',
                (file_id,)
            )
        elif file_type is not None:
            # Get the most recent file of the specified type
            if latest:
                cursor.execute(
                    '''
                    SELECT id, filename, content, file_type, metadata
                    FROM files
                    WHERE file_type = %s
                    ORDER BY created_at DESC
                    LIMIT 1
                    ''',
                    (file_type,)
                )
            else:
                cursor.execute(
                    '''
                    SELECT id, filename, content, file_type, metadata
                    FROM files
                    WHERE file_type = %s
                    ''',
                    (file_type,)
                )
        else:
            logger.warning("Neither file_id nor file_type provided for database query")
            return None
        
        result = cursor.fetchone()
        
        if result:
            file_id, filename, content, file_type, metadata = result
            
            # Process metadata properly
            if metadata:
                # If metadata is already a dict, use it directly
                if isinstance(metadata, dict):
                    metadata_dict = metadata
                else:
                    # Otherwise, try to parse it as JSON
                    try:
                        metadata_dict = json.loads(metadata)
                    except (TypeError, ValueError):
                        logger.warning(f"Failed to parse metadata for file {file_id}. Using empty dict.")
                        metadata_dict = {}
            else:
                metadata_dict = {}
            
            file_data = {
                'id': file_id,
                'filename': filename,
                'content': content,
                'file_type': file_type,
                'metadata': metadata_dict
            }
            
            logger.info(f"File {filename} (ID: {file_id}) retrieved from database")
            return file_data
        else:
            if file_id is not None:
                logger.warning(f"No file found with ID: {file_id}")
            else:
                logger.warning(f"No files found with type: {file_type}")
            return None
    except Exception as e:
        logger.error(f"Error retrieving file from database: {str(e)}")
        raise
    finally:
        if conn:
            conn.close()

def get_dataframe_from_db(file_id=None, file_type=None, latest=True):
    """
    Retrieve a dataframe from the database
    
    Args:
        file_id: ID of the file to retrieve
        file_type: Type of file to retrieve (used if file_id is None)
        latest: If True, get the most recent file of this type (used if file_id is None)
        
    Returns:
        DataFrame: pandas DataFrame from the retrieved file
    """
    file_data = get_file_from_db(file_id, file_type, latest)
    
    if file_data and 'content' in file_data:
        # Convert the binary data to a pandas DataFrame
        content = file_data['content']
        df = pd.read_csv(io.BytesIO(content))
        return df
    
    return None

# Specific functions for different file types
def save_processed_data(filename, content, file_type='preprocessed'):
    """Save processed data directly to database"""
    return save_data_to_db(filename, content, file_type)

def save_categorized_data(filename, content, file_type='categorized'):
    """Save categorized data directly to database"""
    return save_data_to_db(filename, content, file_type)

def save_analysis_results(niche_data, subniche_data):
    """Save analysis results directly to database"""
    niche_id = save_data_to_db("Niche_Ranking_Analysis.csv", niche_data, 'niche_analysis')
    subniche_id = save_data_to_db("Subniche_Analysis.csv", subniche_data, 'subniche_analysis')
    return niche_id, subniche_id

def save_generated_titles(filename, content):
    """Save generated titles directly to database"""
    return save_data_to_db(filename, content, 'generated')