| | """ |
| | This class create a connection to Snowflake, run queries (read and write) |
| | """ |
| | import json |
| |
|
| | import numpy as np |
| | import pandas as pd |
| | from snowflake.snowpark import Session |
| | from sympy.strategies.branch import condition |
| |
|
| |
|
| | class SnowFlakeConn: |
| | def __init__(self, session, brand): |
| | self. session = session |
| | self.brand = brand |
| |
|
| | self.final_columns = ['user_id', "email", "user_info", "permission", "expiration_date", "recsys_result", "message", "brand", "recommendation", "segment_name", "timestamp"] |
| |
|
| | self.campaign_id = { |
| | "singeo": 460, |
| | "pianote": 457, |
| | "guitareo": 458, |
| | "drumeo": 392 |
| | } |
| |
|
| | |
| | |
| | def run_read_query(self, query, data): |
| | """ |
| | Executes a SQL query on Snowflake that fetch the data |
| | :return: Pandas dataframe containing the query results |
| | """ |
| |
|
| | |
| | try: |
| | dataframe = self.session.sql(query).to_pandas() |
| | dataframe.columns = dataframe.columns.str.lower() |
| | print(f"reading {data} table successfully") |
| | return dataframe |
| | except Exception as e: |
| | print(f"Error in creating/updating table: {e}") |
| |
|
| | |
| | |
| | def is_json_parsed_to_collection(self, s): |
| | try: |
| | parsed = json.loads(s) |
| | return isinstance(parsed, (dict, list)) |
| | except: |
| | return False |
| | |
| | |
| | def store_df_to_snowflake(self, table_name, dataframe, database="ONLINE_RECSYS", schema="GENERATED_DATA"): |
| | """ |
| | Executes a SQL query on Snowflake that write the preprocessed data on new tables |
| | :param query: SQL query string to be executed |
| | :return: None |
| | """ |
| |
|
| | try: |
| | self.session.use_database(database) |
| | self.session.use_schema(schema) |
| |
|
| | dataframe = dataframe.reset_index(drop=True) |
| | dataframe.columns = dataframe.columns.str.upper() |
| |
|
| | self.session.write_pandas(df=dataframe, |
| | table_name=table_name.strip().upper(), |
| | auto_create_table=True, |
| | overwrite=True, |
| | use_logical_type=True) |
| | print(f"Data inserted into {table_name} successfully.") |
| |
|
| | except Exception as e: |
| | print(f"Error in creating/updating/inserting table: {e}") |
| |
|
| | |
| | |
| | def get_data(self, data, list_of_ids=None): |
| | """ |
| | valid Data is = {users, contents, interactions, recsys, popular_contents} |
| | :param data: |
| | :return: |
| | """ |
| | valid_data = {'users', 'contents', 'interactions', 'recsys', 'popular_contents'} |
| |
|
| | if data not in valid_data: |
| | raise ValueError(f"Invalid data type: {data}") |
| |
|
| | |
| | method_name = f"_get_{data}" |
| |
|
| | |
| | method = getattr(self, method_name, None) |
| | if method is None: |
| | raise NotImplementedError(f"The method {method_name} is not implemented.") |
| |
|
| | query = method(list_of_ids) |
| | data = self.run_read_query(query, data) |
| |
|
| | return data |
| | |
| | |
| | def _get_contents(self, list_of_ids=None): |
| | query = f""" |
| | select CONTENT_ID, CONTENT_TYPE, CONTENT_PROFILE as content_info --, CONTENT_PROFILE_VECTOR |
| | from ONLINE_RECSYS.VECTOR_DB.VECTORIZED_CONTENT |
| | where BRAND = '{self.brand}' |
| | """ |
| | return query |
| | |
| | |
| | def _get_users(self, list_of_ids=None): |
| |
|
| | if list_of_ids is not None: |
| | ids_str = "(" + ", ".join(map(str, list_of_ids)) + ")" |
| | condition = f"AND USER_ID in {ids_str}" |
| | else : |
| | condition = "" |
| |
|
| | query = f""" |
| | select USER_ID, BRAND, FIRST_NAME, BIRTHDAY, TIMEZONE, EMAIL, CURRENT_TIMESTAMP() AS TIMESTAMP, DIFFICULTY, SELF_REPORT_DIFFICULTY, USER_PROFILE as user_info, PERMISSION, EXPIRATION_DATE, |
| | DATEDIFF( |
| | day, |
| | CURRENT_DATE(), |
| | CASE |
| | WHEN DATE_FROM_PARTS(YEAR(CURRENT_DATE()), EXTRACT(MONTH FROM BIRTHDAY), EXTRACT(DAY FROM BIRTHDAY)) < CURRENT_DATE() |
| | THEN DATE_FROM_PARTS(YEAR(CURRENT_DATE()) + 1, EXTRACT(MONTH FROM BIRTHDAY), EXTRACT(DAY FROM BIRTHDAY)) |
| | ELSE DATE_FROM_PARTS(YEAR(CURRENT_DATE()), EXTRACT(MONTH FROM BIRTHDAY), EXTRACT(DAY FROM BIRTHDAY)) |
| | END) AS birthday_reminder |
| | from ONLINE_RECSYS.PREPROCESSED.USERS |
| | where BRAND = '{self.brand}' {condition} |
| | """ |
| | return query |
| | |
| | |
| | def _get_interactions(self, list_of_ids=None): |
| |
|
| | if list_of_ids is not None: |
| | ids_str = "(" + ", ".join(map(str, list_of_ids)) + ")" |
| | condition = f"AND USER_ID in {ids_str}" |
| | else : |
| | condition = "" |
| |
|
| | query = f""" |
| | WITH latest_interactions AS( |
| | SELECT |
| | USER_ID, CONTENT_ID, CONTENT_TYPE, EVENT_TEXT, TIMESTAMP, |
| | ROW_NUMBER() OVER(PARTITION BY USER_ID ORDER BY TIMESTAMP DESC) AS rn |
| | FROM ONLINE_RECSYS.PREPROCESSED.RECSYS_INTEACTIONS |
| | WHERE BRAND = '{self.brand}' AND EVENT_TEXT IN('Video Completed', 'Video Playing') {condition}) |
| | |
| | SELECT i.USER_ID, i.CONTENT_ID, i.CONTENT_TYPE, c.content_profile as last_completed_content, i.EVENT_TEXT, i.TIMESTAMP, DATEDIFF('week', i.TIMESTAMP, CURRENT_TIMESTAMP) AS weeks_since_last_interaction |
| | FROM latest_interactions i |
| | LEFT JOIN |
| | ONLINE_RECSYS.VECTOR_DB.VECTORIZED_CONTENT c ON c.CONTENT_ID = i.CONTENT_ID |
| | WHERE rn = 1; |
| | """ |
| | return query |
| | |
| | |
| | def _get_recsys(self, list_of_ids=None): |
| |
|
| | if list_of_ids is not None: |
| | ids_str = "(" + ", ".join(map(str, list_of_ids)) + ")" |
| | condition = f"WHERE USER_ID in {ids_str}" |
| | else : |
| | condition = "" |
| |
|
| | recsys_col = f"{self.brand}_recsys_v3" |
| | query = f""" |
| | select USER_ID, {recsys_col} as recsys_result |
| | from RECSYS_V3.RECSYS_CIO.RECSYS_V3_CUSTOMER_IO_OLD |
| | {condition} |
| | """ |
| | return query |
| | |
| | |
| | def _get_popular_contents(self, list_of_ids=None): |
| |
|
| | query = f""" |
| | select POPULAR_CONTENT |
| | from RECSYS_V3.RECSYS_CIO.POPULAR_CONTENT_CUSTOMER_IO_OLD |
| | where brand = '{self.brand.lower()}' |
| | """ |
| |
|
| | return query |
| | |
| | |
| | def extract_id_from_email(self, emails): |
| | """ |
| | extracting user_ids from emails |
| | :param unique_emails: |
| | :return: |
| | """ |
| |
|
| | email_list_str = ', '.join(f"'{email}'" for email in emails) |
| | query = f""" |
| | SELECT id as USER_ID, email as EMAIL |
| | FROM STITCH.MUSORA_ECOM_DB.USORA_USERS |
| | WHERE email IN ({email_list_str}) |
| | """ |
| |
|
| | user_ids_df = self.run_read_query(query, data="User_ids") |
| | return user_ids_df |
| | |
| | |
| |
|
| | def adjust_dataframe(self, dataframe): |
| | """ |
| | Filter dataframe to only include the columns in self.final_columns. |
| | Add any missing columns with None values. |
| | Ensure the final order is consistent with self.final_columns. |
| | """ |
| | |
| | final_df = dataframe.copy() |
| |
|
| | |
| | final_df.columns = final_df.columns.str.lower() |
| | expected_cols = [col.lower() for col in self.final_columns] |
| |
|
| | |
| | available = [col for col in final_df.columns if col in expected_cols] |
| | final_df = final_df[available] |
| |
|
| | |
| | for col in expected_cols: |
| | if col not in final_df.columns: |
| | final_df[col] = None |
| |
|
| | |
| | final_df = final_df[expected_cols] |
| |
|
| | |
| | |
| | rename_mapping = {col.lower(): col for col in self.final_columns} |
| | final_df.rename(columns=rename_mapping, inplace=True) |
| |
|
| | return final_df |
| |
|
| | |
| | |
| | def close_connection(self): |
| | self.session.close() |
| |
|
| | def get_inactive_users_by_brand(self, brand): |
| | if brand == "singeo": |
| | return self.get_users_in_campaign(brand=brand) |
| | else: |
| | return self._get_inactive_users(brand=brand) |
| |
|
| | |
| | def get_users_in_campaign(self, brand, campaign_name="Singeo - Inactive Members (for 3 days) - Re-engagement", stage=1, campaign_view='singeo_re_engagement'): |
| | """ |
| | creating a query to fetch requested users |
| | :param campaign_view: |
| | :param stage: |
| | :param campaign_name: |
| | :param brand: |
| | :return: |
| | """ |
| | |
| |
|
| | if stage == 1: |
| | query = f""" |
| | WITH eligible_users AS ( |
| | SELECT |
| | s.USER_ID, |
| | s.EMAIL, |
| | s.BRAND, |
| | s.CAMPAIGN_NAME |
| | FROM MESSAGING_SYSTEM.CAMPAIGNS.{campaign_view} s |
| | WHERE s.CAMPAIGN_NAME = '{campaign_name}' |
| | AND s.BRAND = '{brand}' |
| | ), |
| | msgs AS ( |
| | -- Only messages from the same brand/campaign |
| | SELECT |
| | m.EMAIL, |
| | m.BRAND, |
| | m.CAMPAIGN_NAME, |
| | m.STAGE, |
| | m.TIMESTAMP |
| | FROM MESSAGING_SYSTEM.GENERATED_DATA.INITIAL_MESSAGES m |
| | WHERE m.CAMPAIGN_NAME = '{campaign_name}' |
| | AND m.BRAND = '{brand}' |
| | ), |
| | no_stage1 AS ( |
| | -- Users who do not already have stage 1 |
| | SELECT e.* |
| | FROM eligible_users e |
| | LEFT JOIN msgs m1 |
| | ON m1.EMAIL = e.EMAIL |
| | AND m1.STAGE = 1 |
| | WHERE m1.EMAIL IS NULL |
| | ) |
| | SELECT |
| | n.EMAIL, |
| | FROM no_stage1 n |
| | ; |
| | """ |
| |
|
| | else: |
| | query = f""" |
| | WITH eligible_users AS ( |
| | SELECT |
| | s.USER_ID, |
| | s.EMAIL, |
| | s.BRAND, |
| | s.CAMPAIGN_NAME |
| | FROM MESSAGING_SYSTEM.CAMPAIGNS.{campaign_view} s |
| | WHERE s.CAMPAIGN_NAME = '{campaign_name}' |
| | AND s.BRAND = '{brand}' |
| | ), |
| | msgs AS ( |
| | -- Only messages from the same brand/campaign |
| | SELECT |
| | m.EMAIL, |
| | m.BRAND, |
| | m.CAMPAIGN_NAME, |
| | m.STAGE, |
| | m.TIMESTAMP |
| | FROM MESSAGING_SYSTEM.GENERATED_DATA.INITIAL_MESSAGES m |
| | WHERE m.CAMPAIGN_NAME = '{campaign_name}' |
| | AND m.BRAND = '{brand}' |
| | ), |
| | has_prev AS ( |
| | -- Users who have the immediately previous stage (N-1) |
| | SELECT e.* |
| | FROM eligible_users e |
| | JOIN msgs mp |
| | ON mp.EMAIL = e.EMAIL |
| | AND mp.STAGE = {stage} - 1 |
| | ), |
| | no_current_stage AS ( |
| | -- Exclude users who already have the current stage (N) |
| | SELECT e.* |
| | FROM has_prev e |
| | LEFT JOIN msgs mn |
| | ON mn.EMAIL = e.EMAIL |
| | AND mn.STAGE = {stage} |
| | WHERE mn.EMAIL IS NULL |
| | ), |
| | latest_msg AS ( |
| | -- Most-recent message timestamp per user (within this brand/campaign) |
| | SELECT |
| | EMAIL, |
| | MAX(TIMESTAMP) AS LAST_TS |
| | FROM msgs |
| | GROUP BY EMAIL |
| | ) |
| | SELECT |
| | n.EMAIL, |
| | FROM no_current_stage n |
| | JOIN latest_msg l |
| | ON l.EMAIL = n.EMAIL |
| | -- Ensure no messages for this user in this brand/campaign in the last 36 hours |
| | --WHERE l.LAST_TS < DATEADD(HOUR, -36, CURRENT_TIMESTAMP()) |
| | ; |
| | """ |
| |
|
| | users_df = self.run_read_query(query, data=f"{brand}_campaign") |
| | return users_df |
| |
|
| | def _get_inactive_users(self, brand: str): |
| | """ |
| | Return up to 50 USER_IDs in the requested brand whose last interaction is > 5 days ago, |
| | restricted to users that exist in ONLINE_RECSYS.PREPROCESSED.USERS. |
| | """ |
| |
|
| | path = f"Data/{brand.lower()}_test.csv" |
| | users_df = pd.read_csv(path) |
| | return users_df |
| |
|
| |
|
| |
|