File size: 13,175 Bytes
d0e3307 707d2d4 d0e3307 b89978f d0e3307 3cb83db d0e3307 3cb83db d0e3307 3cb83db d0e3307 707d2d4 ce32f1f 707d2d4 ce32f1f 65016d2 ce32f1f | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 | """
This class create a connection to Snowflake, run queries (read and write)
"""
import json
import numpy as np
import pandas as pd
from snowflake.snowpark import Session
from sympy.strategies.branch import condition
class SnowFlakeConn:
def __init__(self, session, brand):
self. session = session
self.brand = brand
self.final_columns = ['user_id', "email", "user_info", "permission", "expiration_date", "recsys_result", "message", "brand", "recommendation", "segment_name", "timestamp"]
self.campaign_id = {
"singeo": 460,
"pianote": 457,
"guitareo": 458,
"drumeo": 392
}
# ---------------------------------------------------------------
# ---------------------------------------------------------------
def run_read_query(self, query, data):
"""
Executes a SQL query on Snowflake that fetch the data
:return: Pandas dataframe containing the query results
"""
# Connect to Snowflake
try:
dataframe = self.session.sql(query).to_pandas()
dataframe.columns = dataframe.columns.str.lower()
print(f"reading {data} table successfully")
return dataframe
except Exception as e:
print(f"Error in creating/updating table: {e}")
# ---------------------------------------------------------------
# ---------------------------------------------------------------
def is_json_parsed_to_collection(self, s):
try:
parsed = json.loads(s)
return isinstance(parsed, (dict, list))
except:
return False
# ---------------------------------------------------------------
# ---------------------------------------------------------------
def store_df_to_snowflake(self, table_name, dataframe, database="ONLINE_RECSYS", schema="GENERATED_DATA"):
"""
Executes a SQL query on Snowflake that write the preprocessed data on new tables
:param query: SQL query string to be executed
:return: None
"""
try:
self.session.use_database(database)
self.session.use_schema(schema)
dataframe = dataframe.reset_index(drop=True)
dataframe.columns = dataframe.columns.str.upper()
self.session.write_pandas(df=dataframe,
table_name=table_name.strip().upper(),
auto_create_table=True,
overwrite=True,
use_logical_type=True)
print(f"Data inserted into {table_name} successfully.")
except Exception as e:
print(f"Error in creating/updating/inserting table: {e}")
# ---------------------------------------------------------------
# ---------------------------------------------------------------
def get_data(self, data, list_of_ids=None):
"""
valid Data is = {users, contents, interactions, recsys, popular_contents}
:param data:
:return:
"""
valid_data = {'users', 'contents', 'interactions', 'recsys', 'popular_contents'}
if data not in valid_data:
raise ValueError(f"Invalid data type: {data}")
# Construct the method name based on the input
method_name = f"_get_{data}"
# Retrieve the method dynamically
method = getattr(self, method_name, None)
if method is None:
raise NotImplementedError(f"The method {method_name} is not implemented.")
query = method(list_of_ids)
data = self.run_read_query(query, data)
return data
# ---------------------------------------------------------------
# ---------------------------------------------------------------
def _get_contents(self, list_of_ids=None):
query = f"""
select CONTENT_ID, CONTENT_TYPE, CONTENT_PROFILE as content_info --, CONTENT_PROFILE_VECTOR
from ONLINE_RECSYS.VECTOR_DB.VECTORIZED_CONTENT
where BRAND = '{self.brand}'
"""
return query
# ---------------------------------------------------------------
# ---------------------------------------------------------------
def _get_users(self, list_of_ids=None):
if list_of_ids is not None:
ids_str = "(" + ", ".join(map(str, list_of_ids)) + ")"
condition = f"AND USER_ID in {ids_str}"
else :
condition = ""
query = f"""
select USER_ID, BRAND, FIRST_NAME, BIRTHDAY, TIMEZONE, EMAIL, CURRENT_TIMESTAMP() AS TIMESTAMP, DIFFICULTY, SELF_REPORT_DIFFICULTY, USER_PROFILE as user_info, PERMISSION, EXPIRATION_DATE,
DATEDIFF(
day,
CURRENT_DATE(),
CASE
WHEN DATE_FROM_PARTS(YEAR(CURRENT_DATE()), EXTRACT(MONTH FROM BIRTHDAY), EXTRACT(DAY FROM BIRTHDAY)) < CURRENT_DATE()
THEN DATE_FROM_PARTS(YEAR(CURRENT_DATE()) + 1, EXTRACT(MONTH FROM BIRTHDAY), EXTRACT(DAY FROM BIRTHDAY))
ELSE DATE_FROM_PARTS(YEAR(CURRENT_DATE()), EXTRACT(MONTH FROM BIRTHDAY), EXTRACT(DAY FROM BIRTHDAY))
END) AS birthday_reminder
from ONLINE_RECSYS.PREPROCESSED.USERS
where BRAND = '{self.brand}' {condition}
"""
return query
# ---------------------------------------------------------------
# ---------------------------------------------------------------
def _get_interactions(self, list_of_ids=None):
if list_of_ids is not None:
ids_str = "(" + ", ".join(map(str, list_of_ids)) + ")"
condition = f"AND USER_ID in {ids_str}"
else :
condition = ""
query = f"""
WITH latest_interactions AS(
SELECT
USER_ID, CONTENT_ID, CONTENT_TYPE, EVENT_TEXT, TIMESTAMP,
ROW_NUMBER() OVER(PARTITION BY USER_ID ORDER BY TIMESTAMP DESC) AS rn
FROM ONLINE_RECSYS.PREPROCESSED.RECSYS_INTEACTIONS
WHERE BRAND = '{self.brand}' AND EVENT_TEXT IN('Video Completed', 'Video Playing') {condition})
SELECT i.USER_ID, i.CONTENT_ID, i.CONTENT_TYPE, c.content_profile as last_completed_content, i.EVENT_TEXT, i.TIMESTAMP, DATEDIFF('week', i.TIMESTAMP, CURRENT_TIMESTAMP) AS weeks_since_last_interaction
FROM latest_interactions i
LEFT JOIN
ONLINE_RECSYS.VECTOR_DB.VECTORIZED_CONTENT c ON c.CONTENT_ID = i.CONTENT_ID
WHERE rn = 1;
"""
return query
# ---------------------------------------------------------------
# ---------------------------------------------------------------
def _get_recsys(self, list_of_ids=None):
if list_of_ids is not None:
ids_str = "(" + ", ".join(map(str, list_of_ids)) + ")"
condition = f"WHERE USER_ID in {ids_str}"
else :
condition = ""
recsys_col = f"{self.brand}_recsys_v3"
query = f"""
select USER_ID, {recsys_col} as recsys_result
from RECSYS_V3.RECSYS_CIO.RECSYS_V3_CUSTOMER_IO_OLD
{condition}
"""
return query
# ---------------------------------------------------------------
# ---------------------------------------------------------------
def _get_popular_contents(self, list_of_ids=None):
query = f"""
select POPULAR_CONTENT
from RECSYS_V3.RECSYS_CIO.POPULAR_CONTENT_CUSTOMER_IO_OLD
where brand = '{self.brand.lower()}'
"""
return query
# ---------------------------------------------------------------
# ---------------------------------------------------------------
def extract_id_from_email(self, emails):
"""
extracting user_ids from emails
:param unique_emails:
:return:
"""
email_list_str = ', '.join(f"'{email}'" for email in emails)
query = f"""
SELECT id as USER_ID, email as EMAIL
FROM STITCH.MUSORA_ECOM_DB.USORA_USERS
WHERE email IN ({email_list_str})
"""
user_ids_df = self.run_read_query(query, data="User_ids")
return user_ids_df
# ---------------------------------------------------------------
# ---------------------------------------------------------------
def adjust_dataframe(self, dataframe):
"""
Filter dataframe to only include the columns in self.final_columns.
Add any missing columns with None values.
Ensure the final order is consistent with self.final_columns.
"""
# Work with a copy so that we don't modify the original input
final_df = dataframe.copy()
# Normalize column names to lower-case for matching (if needed)
final_df.columns = final_df.columns.str.lower()
expected_cols = [col.lower() for col in self.final_columns]
# Keep only those columns in the expected list
available = [col for col in final_df.columns if col in expected_cols]
final_df = final_df[available]
# Add missing columns with None values
for col in expected_cols:
if col not in final_df.columns:
final_df[col] = None
# Reorder the columns to the desired order
final_df = final_df[expected_cols]
# If you need the column names to match exactly what self.final_columns provides (case-sensitive),
# you can rename them accordingly.
rename_mapping = {col.lower(): col for col in self.final_columns}
final_df.rename(columns=rename_mapping, inplace=True)
return final_df
# ---------------------------------------------------------------
# ---------------------------------------------------------------
def close_connection(self):
self.session.close()
def get_inactive_users_by_brand(self, brand):
if brand == "singeo":
return self.get_users_in_campaign(brand=brand)
else:
return self._get_inactive_users(brand=brand)
# ===============================================================
def get_users_in_campaign(self, brand, campaign_name="Singeo - Inactive Members (for 3 days) - Re-engagement", stage=1, campaign_view='singeo_re_engagement'):
"""
creating a query to fetch requested users
:param campaign_view:
:param stage:
:param campaign_name:
:param brand:
:return:
"""
# camp_id = self.campaign_id[brand]
if stage == 1:
query = f"""
WITH eligible_users AS (
SELECT
s.USER_ID,
s.EMAIL,
s.BRAND,
s.CAMPAIGN_NAME
FROM MESSAGING_SYSTEM.CAMPAIGNS.{campaign_view} s
WHERE s.CAMPAIGN_NAME = '{campaign_name}'
AND s.BRAND = '{brand}'
),
msgs AS (
-- Only messages from the same brand/campaign
SELECT
m.EMAIL,
m.BRAND,
m.CAMPAIGN_NAME,
m.STAGE,
m.TIMESTAMP
FROM MESSAGING_SYSTEM.GENERATED_DATA.INITIAL_MESSAGES m
WHERE m.CAMPAIGN_NAME = '{campaign_name}'
AND m.BRAND = '{brand}'
),
no_stage1 AS (
-- Users who do not already have stage 1
SELECT e.*
FROM eligible_users e
LEFT JOIN msgs m1
ON m1.EMAIL = e.EMAIL
AND m1.STAGE = 1
WHERE m1.EMAIL IS NULL
)
SELECT
n.EMAIL,
FROM no_stage1 n
;
"""
else:
query = f"""
WITH eligible_users AS (
SELECT
s.USER_ID,
s.EMAIL,
s.BRAND,
s.CAMPAIGN_NAME
FROM MESSAGING_SYSTEM.CAMPAIGNS.{campaign_view} s
WHERE s.CAMPAIGN_NAME = '{campaign_name}'
AND s.BRAND = '{brand}'
),
msgs AS (
-- Only messages from the same brand/campaign
SELECT
m.EMAIL,
m.BRAND,
m.CAMPAIGN_NAME,
m.STAGE,
m.TIMESTAMP
FROM MESSAGING_SYSTEM.GENERATED_DATA.INITIAL_MESSAGES m
WHERE m.CAMPAIGN_NAME = '{campaign_name}'
AND m.BRAND = '{brand}'
),
has_prev AS (
-- Users who have the immediately previous stage (N-1)
SELECT e.*
FROM eligible_users e
JOIN msgs mp
ON mp.EMAIL = e.EMAIL
AND mp.STAGE = {stage} - 1
),
no_current_stage AS (
-- Exclude users who already have the current stage (N)
SELECT e.*
FROM has_prev e
LEFT JOIN msgs mn
ON mn.EMAIL = e.EMAIL
AND mn.STAGE = {stage}
WHERE mn.EMAIL IS NULL
),
latest_msg AS (
-- Most-recent message timestamp per user (within this brand/campaign)
SELECT
EMAIL,
MAX(TIMESTAMP) AS LAST_TS
FROM msgs
GROUP BY EMAIL
)
SELECT
n.EMAIL,
FROM no_current_stage n
JOIN latest_msg l
ON l.EMAIL = n.EMAIL
-- Ensure no messages for this user in this brand/campaign in the last 36 hours
--WHERE l.LAST_TS < DATEADD(HOUR, -36, CURRENT_TIMESTAMP())
;
"""
users_df = self.run_read_query(query, data=f"{brand}_campaign")
return users_df
def _get_inactive_users(self, brand: str):
"""
Return up to 50 USER_IDs in the requested brand whose last interaction is > 5 days ago,
restricted to users that exist in ONLINE_RECSYS.PREPROCESSED.USERS.
"""
path = f"Data/{brand.lower()}_test.csv"
users_df = pd.read_csv(path)
return users_df
|