Danialebrat's picture
- updating the system to be able to work with all brands
ce32f1f
"""
the flow of the Program starts from create_personalized_message function
"""
import time
from tqdm import tqdm
from Messaging_system.DataCollector import DataCollector
from Messaging_system.CoreConfig import CoreConfig
from Messaging_system.LLMR import LLMR
import streamlit as st
from Messaging_system.Message_generator import MessageGenerator
from Messaging_system.PromptGenerator import PromptGenerator
from Messaging_system.SnowFlakeConnection import SnowFlakeConn
from Messaging_system.Homepage_Recommender import DefaultRec
class Permes:
"""
LLM-based personalized message generator:
"""
def create_personalize_messages(self, session, users, brand, config_file, openai_api_key,
platform="push", number_of_messages=1, instructionset=None, subsequent_examples=None
, recsys_contents=None, model=None, identifier_column="user_id", segment_info=None,
sample_example=None, number_of_samples=None, involve_recsys_result=False,
messaging_mode="message", ongoing_df=None, personalization=False,
progress_callback=None, segment_name="no_recent_activity"):
"""
creating personalized messages for the input users given the parameters for both app and push platform.
:param session: snowflake connection object
:param users: users dataframe
:param brand
:param config_file
:param openai_api_key
:param CTA: call to action for the messages
:param segment_info: common information about the users
:param message_style: style of the message
:param sample_example: a sample for one shot prompting
:return:
"""
# primary processing
users = self.identify_users(users_df=users, identifier_column=identifier_column)
personalize_message = CoreConfig(session=session,
users_df=users,
brand=brand,
platform=platform,
config_file=config_file)
personalize_message.set_openai_api(openai_api_key)
personalize_message.set_segment_name(segment_name=segment_name)
personalize_message.set_number_of_messages(number_of_messages=number_of_messages,
instructionset=instructionset,
subsequent_examples=subsequent_examples)
if sample_example is not None: # Check if sample_example is not empty
personalize_message.set_sample_example(sample_example)
if number_of_samples is not None:
personalize_message.set_number_of_samples(number_of_samples)
if model is not None:
personalize_message.set_llm_model(model)
if segment_info is not None:
personalize_message.set_segment_info(segment_info)
if personalization:
personalize_message.set_personalization()
if involve_recsys_result:
personalize_message.set_messaging_mode("recsys_result")
personalize_message.set_involve_recsys_result(involve_recsys_result)
# if messaging_mode != "message":
# personalize_message.set_messaging_mode(messaging_mode)
if recsys_contents:
personalize_message.set_recsys_contents(recsys_contents)
users_df = self._create_personalized_message(CoreConfig=personalize_message, progress_callback=progress_callback)
if users_df is None:
return None
total_prompt_tokens = personalize_message.total_tokens["prompt_tokens"]
total_completion_tokens = personalize_message.total_tokens["completion_tokens"]
total_cost = self.calculate_cost(total_prompt_tokens, total_completion_tokens, model)
print(f"Estimated Cost (USD): {total_cost:.5f} ---> Number of messages: {(len(users_df) * number_of_messages)}")
st.write(f"Estimated Cost (USD): {total_cost:.5f} ---> Number of messages: {(len(users_df) * number_of_messages)}")
scale_price = (total_cost * 1000) / (len(users_df) * number_of_messages)
print(f"Estimated Cost (USD) for 1000 messages: {scale_price}")
st.write(f"Estimated Cost (USD) for 1000 messages: {scale_price}")
# Storing process can also happen after some evaluation steps
# snowflake_conn = SnowFlakeConn(session=session, brand=brand)
# query = snowflake_conn.generate_write_sql_query(table_name="AI_generated_messages", dataframe=users_df)
# snowflake_conn.run_write_query(query=query, table_name="AI_generated_messages", dataframe=users_df)
# snowflake_conn.close_connection()
return users_df
# -----------------------------------------------------
def calculate_cost(self, total_prompt_tokens, total_completion_tokens, model):
input_price, output_price = self.get_model_price(model)
total_cost = ((total_prompt_tokens / 1000000) * input_price) + (
(total_completion_tokens / 1000000) * output_price) # Cost calculation estimation
return total_cost
# ====================================================
def get_model_price(self, model):
"""
getting the input price and output price per 1m token for the requested model
:param model:
:return:
"""
input_prices = {
"gpt-4o-mini":0.15,
"gpt-4.1-mini":0.4,
"gpt-5-mini": 0.25,
"gpt-5-nano": 0.05,
"gemini-2.5-flash":0.3,
"gemini-2.0-flash":0.1,
"gemini-2.5-flash-lite":0.1,
"claude-3-5-haiku-latest":0.8,
"google/gemma-3-27b-instruct/bf-16": 0.15
}
out_prices = {
"gpt-4o-mini":0.6,
"gpt-4.1-mini":1.6,
"gpt-5-mini": 2,
"gpt-5-nano": 0.4,
"gemini-2.5-flash":2.5,
"gemini-2.0-flash":0.7,
"gemini-2.5-flash-lite":0.4,
"claude-3-5-haiku-latest":3,
"google/gemma-3-27b-instruct/bf-16": 0.3
}
i_price = input_prices.get(model, 0)
o_price= out_prices.get(model, 0)
return i_price, o_price
# =====================================================
def identify_users(self, users_df, identifier_column):
"""
specifying the users for identification
:param identifier_column:
:return: updated users
"""
if identifier_column.upper() == "EMAIL":
return users_df
else:
users_df.rename(columns={identifier_column: "USER_ID"}, inplace=True)
return users_df
# ------------------------------------------------------------------
def _create_personalized_message(self, CoreConfig, progress_callback):
"""
main function of the class to flow the work between functions inorder to create personalized messages.
:return: updated users_df with extracted information and personalize messages.
"""
# Collecting all the data that we need to personalize messages
datacollect = DataCollector(CoreConfig)
CoreConfig = datacollect.gather_data()
if len(CoreConfig.users_df) == 0:
print("There is no user to generate messages")
return None
# generating recommendations for users, if we want to include recommendations in the message
if CoreConfig.involve_recsys_result and CoreConfig.messaging_mode != "message":
Recommender = LLMR(CoreConfig, random=True)
CoreConfig = Recommender.get_recommendations(progress_callback)
else:
# We only want to generate the message and redirect them to For You section or Homepage
Recommender = DefaultRec(CoreConfig)
CoreConfig = Recommender.get_recommendations()
# generating proper prompt for each user
prompt = PromptGenerator(CoreConfig)
CoreConfig = prompt.generate_prompts()
# generating messages for each user
message_generator = MessageGenerator(CoreConfig)
CoreConfig = message_generator.generate_messages(progress_callback)
# Eliminating rows where we don't have a valid message (null, empty, or whitespace only)
CoreConfig.users_df = CoreConfig.users_df[CoreConfig.users_df["message"].str.strip().astype(bool)]
CoreConfig.checkpoint()
# closing snowflake connection
# CoreConfig.session.close()
return CoreConfig.users_df