""" This class create a connection to Snowflake, run queries (read and write) """ import json import numpy as np import pandas as pd from snowflake.snowpark import Session from sympy.strategies.branch import condition class SnowFlakeConn: def __init__(self, session, brand): self. session = session self.brand = brand self.final_columns = ['user_id', "email", "user_info", "permission", "expiration_date", "recsys_result", "message", "brand", "recommendation", "segment_name", "timestamp"] self.campaign_id = { "singeo": 460, "pianote": 457, "guitareo": 458, "drumeo": 392 } # --------------------------------------------------------------- # --------------------------------------------------------------- def run_read_query(self, query, data): """ Executes a SQL query on Snowflake that fetch the data :return: Pandas dataframe containing the query results """ # Connect to Snowflake try: dataframe = self.session.sql(query).to_pandas() dataframe.columns = dataframe.columns.str.lower() print(f"reading {data} table successfully") return dataframe except Exception as e: print(f"Error in creating/updating table: {e}") # --------------------------------------------------------------- # --------------------------------------------------------------- def is_json_parsed_to_collection(self, s): try: parsed = json.loads(s) return isinstance(parsed, (dict, list)) except: return False # --------------------------------------------------------------- # --------------------------------------------------------------- def store_df_to_snowflake(self, table_name, dataframe, database="ONLINE_RECSYS", schema="GENERATED_DATA"): """ Executes a SQL query on Snowflake that write the preprocessed data on new tables :param query: SQL query string to be executed :return: None """ try: self.session.use_database(database) self.session.use_schema(schema) dataframe = dataframe.reset_index(drop=True) dataframe.columns = dataframe.columns.str.upper() self.session.write_pandas(df=dataframe, table_name=table_name.strip().upper(), auto_create_table=True, overwrite=True, use_logical_type=True) print(f"Data inserted into {table_name} successfully.") except Exception as e: print(f"Error in creating/updating/inserting table: {e}") # --------------------------------------------------------------- # --------------------------------------------------------------- def get_data(self, data, list_of_ids=None): """ valid Data is = {users, contents, interactions, recsys, popular_contents} :param data: :return: """ valid_data = {'users', 'contents', 'interactions', 'recsys', 'popular_contents'} if data not in valid_data: raise ValueError(f"Invalid data type: {data}") # Construct the method name based on the input method_name = f"_get_{data}" # Retrieve the method dynamically method = getattr(self, method_name, None) if method is None: raise NotImplementedError(f"The method {method_name} is not implemented.") query = method(list_of_ids) data = self.run_read_query(query, data) return data # --------------------------------------------------------------- # --------------------------------------------------------------- def _get_contents(self, list_of_ids=None): query = f""" select CONTENT_ID, CONTENT_TYPE, CONTENT_PROFILE as content_info --, CONTENT_PROFILE_VECTOR from ONLINE_RECSYS.VECTOR_DB.VECTORIZED_CONTENT where BRAND = '{self.brand}' """ return query # --------------------------------------------------------------- # --------------------------------------------------------------- def _get_users(self, list_of_ids=None): if list_of_ids is not None: ids_str = "(" + ", ".join(map(str, list_of_ids)) + ")" condition = f"AND USER_ID in {ids_str}" else : condition = "" query = f""" select USER_ID, BRAND, FIRST_NAME, BIRTHDAY, TIMEZONE, EMAIL, CURRENT_TIMESTAMP() AS TIMESTAMP, DIFFICULTY, SELF_REPORT_DIFFICULTY, USER_PROFILE as user_info, PERMISSION, EXPIRATION_DATE, DATEDIFF( day, CURRENT_DATE(), CASE WHEN DATE_FROM_PARTS(YEAR(CURRENT_DATE()), EXTRACT(MONTH FROM BIRTHDAY), EXTRACT(DAY FROM BIRTHDAY)) < CURRENT_DATE() THEN DATE_FROM_PARTS(YEAR(CURRENT_DATE()) + 1, EXTRACT(MONTH FROM BIRTHDAY), EXTRACT(DAY FROM BIRTHDAY)) ELSE DATE_FROM_PARTS(YEAR(CURRENT_DATE()), EXTRACT(MONTH FROM BIRTHDAY), EXTRACT(DAY FROM BIRTHDAY)) END) AS birthday_reminder from ONLINE_RECSYS.PREPROCESSED.USERS where BRAND = '{self.brand}' {condition} """ return query # --------------------------------------------------------------- # --------------------------------------------------------------- def _get_interactions(self, list_of_ids=None): if list_of_ids is not None: ids_str = "(" + ", ".join(map(str, list_of_ids)) + ")" condition = f"AND USER_ID in {ids_str}" else : condition = "" query = f""" WITH latest_interactions AS( SELECT USER_ID, CONTENT_ID, CONTENT_TYPE, EVENT_TEXT, TIMESTAMP, ROW_NUMBER() OVER(PARTITION BY USER_ID ORDER BY TIMESTAMP DESC) AS rn FROM ONLINE_RECSYS.PREPROCESSED.RECSYS_INTEACTIONS WHERE BRAND = '{self.brand}' AND EVENT_TEXT IN('Video Completed', 'Video Playing') {condition}) SELECT i.USER_ID, i.CONTENT_ID, i.CONTENT_TYPE, c.content_profile as last_completed_content, i.EVENT_TEXT, i.TIMESTAMP, DATEDIFF('week', i.TIMESTAMP, CURRENT_TIMESTAMP) AS weeks_since_last_interaction FROM latest_interactions i LEFT JOIN ONLINE_RECSYS.VECTOR_DB.VECTORIZED_CONTENT c ON c.CONTENT_ID = i.CONTENT_ID WHERE rn = 1; """ return query # --------------------------------------------------------------- # --------------------------------------------------------------- def _get_recsys(self, list_of_ids=None): if list_of_ids is not None: ids_str = "(" + ", ".join(map(str, list_of_ids)) + ")" condition = f"WHERE USER_ID in {ids_str}" else : condition = "" recsys_col = f"{self.brand}_recsys_v3" query = f""" select USER_ID, {recsys_col} as recsys_result from RECSYS_V3.RECSYS_CIO.RECSYS_V3_CUSTOMER_IO_OLD {condition} """ return query # --------------------------------------------------------------- # --------------------------------------------------------------- def _get_popular_contents(self, list_of_ids=None): query = f""" select POPULAR_CONTENT from RECSYS_V3.RECSYS_CIO.POPULAR_CONTENT_CUSTOMER_IO_OLD where brand = '{self.brand.lower()}' """ return query # --------------------------------------------------------------- # --------------------------------------------------------------- def extract_id_from_email(self, emails): """ extracting user_ids from emails :param unique_emails: :return: """ email_list_str = ', '.join(f"'{email}'" for email in emails) query = f""" SELECT id as USER_ID, email as EMAIL FROM STITCH.MUSORA_ECOM_DB.USORA_USERS WHERE email IN ({email_list_str}) """ user_ids_df = self.run_read_query(query, data="User_ids") return user_ids_df # --------------------------------------------------------------- # --------------------------------------------------------------- def adjust_dataframe(self, dataframe): """ Filter dataframe to only include the columns in self.final_columns. Add any missing columns with None values. Ensure the final order is consistent with self.final_columns. """ # Work with a copy so that we don't modify the original input final_df = dataframe.copy() # Normalize column names to lower-case for matching (if needed) final_df.columns = final_df.columns.str.lower() expected_cols = [col.lower() for col in self.final_columns] # Keep only those columns in the expected list available = [col for col in final_df.columns if col in expected_cols] final_df = final_df[available] # Add missing columns with None values for col in expected_cols: if col not in final_df.columns: final_df[col] = None # Reorder the columns to the desired order final_df = final_df[expected_cols] # If you need the column names to match exactly what self.final_columns provides (case-sensitive), # you can rename them accordingly. rename_mapping = {col.lower(): col for col in self.final_columns} final_df.rename(columns=rename_mapping, inplace=True) return final_df # --------------------------------------------------------------- # --------------------------------------------------------------- def close_connection(self): self.session.close() def get_inactive_users_by_brand(self, brand): if brand == "singeo": return self.get_users_in_campaign(brand=brand) else: return self._get_inactive_users(brand=brand) # =============================================================== def get_users_in_campaign(self, brand, campaign_name="Singeo - Inactive Members (for 3 days) - Re-engagement", stage=1, campaign_view='singeo_re_engagement'): """ creating a query to fetch requested users :param campaign_view: :param stage: :param campaign_name: :param brand: :return: """ # camp_id = self.campaign_id[brand] if stage == 1: query = f""" WITH eligible_users AS ( SELECT s.USER_ID, s.EMAIL, s.BRAND, s.CAMPAIGN_NAME FROM MESSAGING_SYSTEM.CAMPAIGNS.{campaign_view} s WHERE s.CAMPAIGN_NAME = '{campaign_name}' AND s.BRAND = '{brand}' ), msgs AS ( -- Only messages from the same brand/campaign SELECT m.EMAIL, m.BRAND, m.CAMPAIGN_NAME, m.STAGE, m.TIMESTAMP FROM MESSAGING_SYSTEM.GENERATED_DATA.INITIAL_MESSAGES m WHERE m.CAMPAIGN_NAME = '{campaign_name}' AND m.BRAND = '{brand}' ), no_stage1 AS ( -- Users who do not already have stage 1 SELECT e.* FROM eligible_users e LEFT JOIN msgs m1 ON m1.EMAIL = e.EMAIL AND m1.STAGE = 1 WHERE m1.EMAIL IS NULL ) SELECT n.EMAIL, FROM no_stage1 n ; """ else: query = f""" WITH eligible_users AS ( SELECT s.USER_ID, s.EMAIL, s.BRAND, s.CAMPAIGN_NAME FROM MESSAGING_SYSTEM.CAMPAIGNS.{campaign_view} s WHERE s.CAMPAIGN_NAME = '{campaign_name}' AND s.BRAND = '{brand}' ), msgs AS ( -- Only messages from the same brand/campaign SELECT m.EMAIL, m.BRAND, m.CAMPAIGN_NAME, m.STAGE, m.TIMESTAMP FROM MESSAGING_SYSTEM.GENERATED_DATA.INITIAL_MESSAGES m WHERE m.CAMPAIGN_NAME = '{campaign_name}' AND m.BRAND = '{brand}' ), has_prev AS ( -- Users who have the immediately previous stage (N-1) SELECT e.* FROM eligible_users e JOIN msgs mp ON mp.EMAIL = e.EMAIL AND mp.STAGE = {stage} - 1 ), no_current_stage AS ( -- Exclude users who already have the current stage (N) SELECT e.* FROM has_prev e LEFT JOIN msgs mn ON mn.EMAIL = e.EMAIL AND mn.STAGE = {stage} WHERE mn.EMAIL IS NULL ), latest_msg AS ( -- Most-recent message timestamp per user (within this brand/campaign) SELECT EMAIL, MAX(TIMESTAMP) AS LAST_TS FROM msgs GROUP BY EMAIL ) SELECT n.EMAIL, FROM no_current_stage n JOIN latest_msg l ON l.EMAIL = n.EMAIL -- Ensure no messages for this user in this brand/campaign in the last 36 hours --WHERE l.LAST_TS < DATEADD(HOUR, -36, CURRENT_TIMESTAMP()) ; """ users_df = self.run_read_query(query, data=f"{brand}_campaign") return users_df def _get_inactive_users(self, brand: str): """ Return up to 50 USER_IDs in the requested brand whose last interaction is > 5 days ago, restricted to users that exist in ONLINE_RECSYS.PREPROCESSED.USERS. """ path = f"Data/{brand.lower()}_test.csv" users_df = pd.read_csv(path) return users_df