File size: 8,753 Bytes
d0e3307 514a1ba d0e3307 53d317f d0e3307 77c4795 d0e3307 4c91236 d0e3307 b89978f d0e3307 93839a6 d0e3307 093632f 86ff876 093632f 53d317f b942b6b 53d317f d65dfbf b942b6b 53d317f d65dfbf d0e3307 b942b6b d0e3307 ce32f1f d0e3307 6f416a9 d0e3307 6f416a9 d0e3307 ce32f1f d0e3307 514a1ba 77c4795 d0e3307 514a1ba d0e3307 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 | """
the flow of the Program starts from create_personalized_message function
"""
import time
from tqdm import tqdm
from Messaging_system.DataCollector import DataCollector
from Messaging_system.CoreConfig import CoreConfig
from Messaging_system.LLMR import LLMR
import streamlit as st
from Messaging_system.Message_generator import MessageGenerator
from Messaging_system.PromptGenerator import PromptGenerator
from Messaging_system.SnowFlakeConnection import SnowFlakeConn
from Messaging_system.Homepage_Recommender import DefaultRec
class Permes:
"""
LLM-based personalized message generator:
"""
def create_personalize_messages(self, session, users, brand, config_file, openai_api_key,
platform="push", number_of_messages=1, instructionset=None, subsequent_examples=None
, recsys_contents=None, model=None, identifier_column="user_id", segment_info=None,
sample_example=None, number_of_samples=None, involve_recsys_result=False,
messaging_mode="message", ongoing_df=None, personalization=False,
progress_callback=None, segment_name="no_recent_activity"):
"""
creating personalized messages for the input users given the parameters for both app and push platform.
:param session: snowflake connection object
:param users: users dataframe
:param brand
:param config_file
:param openai_api_key
:param CTA: call to action for the messages
:param segment_info: common information about the users
:param message_style: style of the message
:param sample_example: a sample for one shot prompting
:return:
"""
# primary processing
users = self.identify_users(users_df=users, identifier_column=identifier_column)
personalize_message = CoreConfig(session=session,
users_df=users,
brand=brand,
platform=platform,
config_file=config_file)
personalize_message.set_openai_api(openai_api_key)
personalize_message.set_segment_name(segment_name=segment_name)
personalize_message.set_number_of_messages(number_of_messages=number_of_messages,
instructionset=instructionset,
subsequent_examples=subsequent_examples)
if sample_example is not None: # Check if sample_example is not empty
personalize_message.set_sample_example(sample_example)
if number_of_samples is not None:
personalize_message.set_number_of_samples(number_of_samples)
if model is not None:
personalize_message.set_llm_model(model)
if segment_info is not None:
personalize_message.set_segment_info(segment_info)
if personalization:
personalize_message.set_personalization()
if involve_recsys_result:
personalize_message.set_messaging_mode("recsys_result")
personalize_message.set_involve_recsys_result(involve_recsys_result)
# if messaging_mode != "message":
# personalize_message.set_messaging_mode(messaging_mode)
if recsys_contents:
personalize_message.set_recsys_contents(recsys_contents)
users_df = self._create_personalized_message(CoreConfig=personalize_message, progress_callback=progress_callback)
if users_df is None:
return None
total_prompt_tokens = personalize_message.total_tokens["prompt_tokens"]
total_completion_tokens = personalize_message.total_tokens["completion_tokens"]
total_cost = self.calculate_cost(total_prompt_tokens, total_completion_tokens, model)
print(f"Estimated Cost (USD): {total_cost:.5f} ---> Number of messages: {(len(users_df) * number_of_messages)}")
st.write(f"Estimated Cost (USD): {total_cost:.5f} ---> Number of messages: {(len(users_df) * number_of_messages)}")
scale_price = (total_cost * 1000) / (len(users_df) * number_of_messages)
print(f"Estimated Cost (USD) for 1000 messages: {scale_price}")
st.write(f"Estimated Cost (USD) for 1000 messages: {scale_price}")
# Storing process can also happen after some evaluation steps
# snowflake_conn = SnowFlakeConn(session=session, brand=brand)
# query = snowflake_conn.generate_write_sql_query(table_name="AI_generated_messages", dataframe=users_df)
# snowflake_conn.run_write_query(query=query, table_name="AI_generated_messages", dataframe=users_df)
# snowflake_conn.close_connection()
return users_df
# -----------------------------------------------------
def calculate_cost(self, total_prompt_tokens, total_completion_tokens, model):
input_price, output_price = self.get_model_price(model)
total_cost = ((total_prompt_tokens / 1000000) * input_price) + (
(total_completion_tokens / 1000000) * output_price) # Cost calculation estimation
return total_cost
# ====================================================
def get_model_price(self, model):
"""
getting the input price and output price per 1m token for the requested model
:param model:
:return:
"""
input_prices = {
"gpt-4o-mini":0.15,
"gpt-4.1-mini":0.4,
"gpt-5-mini": 0.25,
"gpt-5-nano": 0.05,
"gemini-2.5-flash":0.3,
"gemini-2.0-flash":0.1,
"gemini-2.5-flash-lite":0.1,
"claude-3-5-haiku-latest":0.8,
"google/gemma-3-27b-instruct/bf-16": 0.15
}
out_prices = {
"gpt-4o-mini":0.6,
"gpt-4.1-mini":1.6,
"gpt-5-mini": 2,
"gpt-5-nano": 0.4,
"gemini-2.5-flash":2.5,
"gemini-2.0-flash":0.7,
"gemini-2.5-flash-lite":0.4,
"claude-3-5-haiku-latest":3,
"google/gemma-3-27b-instruct/bf-16": 0.3
}
i_price = input_prices.get(model, 0)
o_price= out_prices.get(model, 0)
return i_price, o_price
# =====================================================
def identify_users(self, users_df, identifier_column):
"""
specifying the users for identification
:param identifier_column:
:return: updated users
"""
if identifier_column.upper() == "EMAIL":
return users_df
else:
users_df.rename(columns={identifier_column: "USER_ID"}, inplace=True)
return users_df
# ------------------------------------------------------------------
def _create_personalized_message(self, CoreConfig, progress_callback):
"""
main function of the class to flow the work between functions inorder to create personalized messages.
:return: updated users_df with extracted information and personalize messages.
"""
# Collecting all the data that we need to personalize messages
datacollect = DataCollector(CoreConfig)
CoreConfig = datacollect.gather_data()
if len(CoreConfig.users_df) == 0:
print("There is no user to generate messages")
return None
# generating recommendations for users, if we want to include recommendations in the message
if CoreConfig.involve_recsys_result and CoreConfig.messaging_mode != "message":
Recommender = LLMR(CoreConfig, random=True)
CoreConfig = Recommender.get_recommendations(progress_callback)
else:
# We only want to generate the message and redirect them to For You section or Homepage
Recommender = DefaultRec(CoreConfig)
CoreConfig = Recommender.get_recommendations()
# generating proper prompt for each user
prompt = PromptGenerator(CoreConfig)
CoreConfig = prompt.generate_prompts()
# generating messages for each user
message_generator = MessageGenerator(CoreConfig)
CoreConfig = message_generator.generate_messages(progress_callback)
# Eliminating rows where we don't have a valid message (null, empty, or whitespace only)
CoreConfig.users_df = CoreConfig.users_df[CoreConfig.users_df["message"].str.strip().astype(bool)]
CoreConfig.checkpoint()
# closing snowflake connection
# CoreConfig.session.close()
return CoreConfig.users_df
|