Danialebrat's picture
- Adding csv files for each brand to speed up the loading time
65016d2
"""
This class create a connection to Snowflake, run queries (read and write)
"""
import json
import numpy as np
import pandas as pd
from snowflake.snowpark import Session
from sympy.strategies.branch import condition
class SnowFlakeConn:
def __init__(self, session, brand):
self. session = session
self.brand = brand
self.final_columns = ['user_id', "email", "user_info", "permission", "expiration_date", "recsys_result", "message", "brand", "recommendation", "segment_name", "timestamp"]
self.campaign_id = {
"singeo": 460,
"pianote": 457,
"guitareo": 458,
"drumeo": 392
}
# ---------------------------------------------------------------
# ---------------------------------------------------------------
def run_read_query(self, query, data):
"""
Executes a SQL query on Snowflake that fetch the data
:return: Pandas dataframe containing the query results
"""
# Connect to Snowflake
try:
dataframe = self.session.sql(query).to_pandas()
dataframe.columns = dataframe.columns.str.lower()
print(f"reading {data} table successfully")
return dataframe
except Exception as e:
print(f"Error in creating/updating table: {e}")
# ---------------------------------------------------------------
# ---------------------------------------------------------------
def is_json_parsed_to_collection(self, s):
try:
parsed = json.loads(s)
return isinstance(parsed, (dict, list))
except:
return False
# ---------------------------------------------------------------
# ---------------------------------------------------------------
def store_df_to_snowflake(self, table_name, dataframe, database="ONLINE_RECSYS", schema="GENERATED_DATA"):
"""
Executes a SQL query on Snowflake that write the preprocessed data on new tables
:param query: SQL query string to be executed
:return: None
"""
try:
self.session.use_database(database)
self.session.use_schema(schema)
dataframe = dataframe.reset_index(drop=True)
dataframe.columns = dataframe.columns.str.upper()
self.session.write_pandas(df=dataframe,
table_name=table_name.strip().upper(),
auto_create_table=True,
overwrite=True,
use_logical_type=True)
print(f"Data inserted into {table_name} successfully.")
except Exception as e:
print(f"Error in creating/updating/inserting table: {e}")
# ---------------------------------------------------------------
# ---------------------------------------------------------------
def get_data(self, data, list_of_ids=None):
"""
valid Data is = {users, contents, interactions, recsys, popular_contents}
:param data:
:return:
"""
valid_data = {'users', 'contents', 'interactions', 'recsys', 'popular_contents'}
if data not in valid_data:
raise ValueError(f"Invalid data type: {data}")
# Construct the method name based on the input
method_name = f"_get_{data}"
# Retrieve the method dynamically
method = getattr(self, method_name, None)
if method is None:
raise NotImplementedError(f"The method {method_name} is not implemented.")
query = method(list_of_ids)
data = self.run_read_query(query, data)
return data
# ---------------------------------------------------------------
# ---------------------------------------------------------------
def _get_contents(self, list_of_ids=None):
query = f"""
select CONTENT_ID, CONTENT_TYPE, CONTENT_PROFILE as content_info --, CONTENT_PROFILE_VECTOR
from ONLINE_RECSYS.VECTOR_DB.VECTORIZED_CONTENT
where BRAND = '{self.brand}'
"""
return query
# ---------------------------------------------------------------
# ---------------------------------------------------------------
def _get_users(self, list_of_ids=None):
if list_of_ids is not None:
ids_str = "(" + ", ".join(map(str, list_of_ids)) + ")"
condition = f"AND USER_ID in {ids_str}"
else :
condition = ""
query = f"""
select USER_ID, BRAND, FIRST_NAME, BIRTHDAY, TIMEZONE, EMAIL, CURRENT_TIMESTAMP() AS TIMESTAMP, DIFFICULTY, SELF_REPORT_DIFFICULTY, USER_PROFILE as user_info, PERMISSION, EXPIRATION_DATE,
DATEDIFF(
day,
CURRENT_DATE(),
CASE
WHEN DATE_FROM_PARTS(YEAR(CURRENT_DATE()), EXTRACT(MONTH FROM BIRTHDAY), EXTRACT(DAY FROM BIRTHDAY)) < CURRENT_DATE()
THEN DATE_FROM_PARTS(YEAR(CURRENT_DATE()) + 1, EXTRACT(MONTH FROM BIRTHDAY), EXTRACT(DAY FROM BIRTHDAY))
ELSE DATE_FROM_PARTS(YEAR(CURRENT_DATE()), EXTRACT(MONTH FROM BIRTHDAY), EXTRACT(DAY FROM BIRTHDAY))
END) AS birthday_reminder
from ONLINE_RECSYS.PREPROCESSED.USERS
where BRAND = '{self.brand}' {condition}
"""
return query
# ---------------------------------------------------------------
# ---------------------------------------------------------------
def _get_interactions(self, list_of_ids=None):
if list_of_ids is not None:
ids_str = "(" + ", ".join(map(str, list_of_ids)) + ")"
condition = f"AND USER_ID in {ids_str}"
else :
condition = ""
query = f"""
WITH latest_interactions AS(
SELECT
USER_ID, CONTENT_ID, CONTENT_TYPE, EVENT_TEXT, TIMESTAMP,
ROW_NUMBER() OVER(PARTITION BY USER_ID ORDER BY TIMESTAMP DESC) AS rn
FROM ONLINE_RECSYS.PREPROCESSED.RECSYS_INTEACTIONS
WHERE BRAND = '{self.brand}' AND EVENT_TEXT IN('Video Completed', 'Video Playing') {condition})
SELECT i.USER_ID, i.CONTENT_ID, i.CONTENT_TYPE, c.content_profile as last_completed_content, i.EVENT_TEXT, i.TIMESTAMP, DATEDIFF('week', i.TIMESTAMP, CURRENT_TIMESTAMP) AS weeks_since_last_interaction
FROM latest_interactions i
LEFT JOIN
ONLINE_RECSYS.VECTOR_DB.VECTORIZED_CONTENT c ON c.CONTENT_ID = i.CONTENT_ID
WHERE rn = 1;
"""
return query
# ---------------------------------------------------------------
# ---------------------------------------------------------------
def _get_recsys(self, list_of_ids=None):
if list_of_ids is not None:
ids_str = "(" + ", ".join(map(str, list_of_ids)) + ")"
condition = f"WHERE USER_ID in {ids_str}"
else :
condition = ""
recsys_col = f"{self.brand}_recsys_v3"
query = f"""
select USER_ID, {recsys_col} as recsys_result
from RECSYS_V3.RECSYS_CIO.RECSYS_V3_CUSTOMER_IO_OLD
{condition}
"""
return query
# ---------------------------------------------------------------
# ---------------------------------------------------------------
def _get_popular_contents(self, list_of_ids=None):
query = f"""
select POPULAR_CONTENT
from RECSYS_V3.RECSYS_CIO.POPULAR_CONTENT_CUSTOMER_IO_OLD
where brand = '{self.brand.lower()}'
"""
return query
# ---------------------------------------------------------------
# ---------------------------------------------------------------
def extract_id_from_email(self, emails):
"""
extracting user_ids from emails
:param unique_emails:
:return:
"""
email_list_str = ', '.join(f"'{email}'" for email in emails)
query = f"""
SELECT id as USER_ID, email as EMAIL
FROM STITCH.MUSORA_ECOM_DB.USORA_USERS
WHERE email IN ({email_list_str})
"""
user_ids_df = self.run_read_query(query, data="User_ids")
return user_ids_df
# ---------------------------------------------------------------
# ---------------------------------------------------------------
def adjust_dataframe(self, dataframe):
"""
Filter dataframe to only include the columns in self.final_columns.
Add any missing columns with None values.
Ensure the final order is consistent with self.final_columns.
"""
# Work with a copy so that we don't modify the original input
final_df = dataframe.copy()
# Normalize column names to lower-case for matching (if needed)
final_df.columns = final_df.columns.str.lower()
expected_cols = [col.lower() for col in self.final_columns]
# Keep only those columns in the expected list
available = [col for col in final_df.columns if col in expected_cols]
final_df = final_df[available]
# Add missing columns with None values
for col in expected_cols:
if col not in final_df.columns:
final_df[col] = None
# Reorder the columns to the desired order
final_df = final_df[expected_cols]
# If you need the column names to match exactly what self.final_columns provides (case-sensitive),
# you can rename them accordingly.
rename_mapping = {col.lower(): col for col in self.final_columns}
final_df.rename(columns=rename_mapping, inplace=True)
return final_df
# ---------------------------------------------------------------
# ---------------------------------------------------------------
def close_connection(self):
self.session.close()
def get_inactive_users_by_brand(self, brand):
if brand == "singeo":
return self.get_users_in_campaign(brand=brand)
else:
return self._get_inactive_users(brand=brand)
# ===============================================================
def get_users_in_campaign(self, brand, campaign_name="Singeo - Inactive Members (for 3 days) - Re-engagement", stage=1, campaign_view='singeo_re_engagement'):
"""
creating a query to fetch requested users
:param campaign_view:
:param stage:
:param campaign_name:
:param brand:
:return:
"""
# camp_id = self.campaign_id[brand]
if stage == 1:
query = f"""
WITH eligible_users AS (
SELECT
s.USER_ID,
s.EMAIL,
s.BRAND,
s.CAMPAIGN_NAME
FROM MESSAGING_SYSTEM.CAMPAIGNS.{campaign_view} s
WHERE s.CAMPAIGN_NAME = '{campaign_name}'
AND s.BRAND = '{brand}'
),
msgs AS (
-- Only messages from the same brand/campaign
SELECT
m.EMAIL,
m.BRAND,
m.CAMPAIGN_NAME,
m.STAGE,
m.TIMESTAMP
FROM MESSAGING_SYSTEM.GENERATED_DATA.INITIAL_MESSAGES m
WHERE m.CAMPAIGN_NAME = '{campaign_name}'
AND m.BRAND = '{brand}'
),
no_stage1 AS (
-- Users who do not already have stage 1
SELECT e.*
FROM eligible_users e
LEFT JOIN msgs m1
ON m1.EMAIL = e.EMAIL
AND m1.STAGE = 1
WHERE m1.EMAIL IS NULL
)
SELECT
n.EMAIL,
FROM no_stage1 n
;
"""
else:
query = f"""
WITH eligible_users AS (
SELECT
s.USER_ID,
s.EMAIL,
s.BRAND,
s.CAMPAIGN_NAME
FROM MESSAGING_SYSTEM.CAMPAIGNS.{campaign_view} s
WHERE s.CAMPAIGN_NAME = '{campaign_name}'
AND s.BRAND = '{brand}'
),
msgs AS (
-- Only messages from the same brand/campaign
SELECT
m.EMAIL,
m.BRAND,
m.CAMPAIGN_NAME,
m.STAGE,
m.TIMESTAMP
FROM MESSAGING_SYSTEM.GENERATED_DATA.INITIAL_MESSAGES m
WHERE m.CAMPAIGN_NAME = '{campaign_name}'
AND m.BRAND = '{brand}'
),
has_prev AS (
-- Users who have the immediately previous stage (N-1)
SELECT e.*
FROM eligible_users e
JOIN msgs mp
ON mp.EMAIL = e.EMAIL
AND mp.STAGE = {stage} - 1
),
no_current_stage AS (
-- Exclude users who already have the current stage (N)
SELECT e.*
FROM has_prev e
LEFT JOIN msgs mn
ON mn.EMAIL = e.EMAIL
AND mn.STAGE = {stage}
WHERE mn.EMAIL IS NULL
),
latest_msg AS (
-- Most-recent message timestamp per user (within this brand/campaign)
SELECT
EMAIL,
MAX(TIMESTAMP) AS LAST_TS
FROM msgs
GROUP BY EMAIL
)
SELECT
n.EMAIL,
FROM no_current_stage n
JOIN latest_msg l
ON l.EMAIL = n.EMAIL
-- Ensure no messages for this user in this brand/campaign in the last 36 hours
--WHERE l.LAST_TS < DATEADD(HOUR, -36, CURRENT_TIMESTAMP())
;
"""
users_df = self.run_read_query(query, data=f"{brand}_campaign")
return users_df
def _get_inactive_users(self, brand: str):
"""
Return up to 50 USER_IDs in the requested brand whose last interaction is > 5 days ago,
restricted to users that exist in ONLINE_RECSYS.PREPROCESSED.USERS.
"""
path = f"Data/{brand.lower()}_test.csv"
users_df = pd.read_csv(path)
return users_df