| | """ |
| | the flow of the Program starts from create_personalized_message function |
| | """ |
| |
|
| |
|
| | import time |
| | from tqdm import tqdm |
| | from Messaging_system.DataCollector import DataCollector |
| | from Messaging_system.CoreConfig import CoreConfig |
| | from Messaging_system.LLMR import LLMR |
| | import streamlit as st |
| | from Messaging_system.Message_generator import MessageGenerator |
| | from Messaging_system.PromptGenerator import PromptGenerator |
| | from Messaging_system.SnowFlakeConnection import SnowFlakeConn |
| | from Messaging_system.Homepage_Recommender import DefaultRec |
| |
|
| |
|
| |
|
| | class Permes: |
| | """ |
| | LLM-based personalized message generator: |
| | """ |
| |
|
| | def create_personalize_messages(self, session, users, brand, config_file, openai_api_key, |
| | platform="push", number_of_messages=1, instructionset=None, subsequent_examples=None |
| | , recsys_contents=None, model=None, identifier_column="user_id", segment_info=None, |
| | sample_example=None, number_of_samples=None, involve_recsys_result=False, |
| | messaging_mode="message", ongoing_df=None, personalization=False, |
| | progress_callback=None, segment_name="no_recent_activity"): |
| | """ |
| | creating personalized messages for the input users given the parameters for both app and push platform. |
| | :param session: snowflake connection object |
| | :param users: users dataframe |
| | :param brand |
| | :param config_file |
| | :param openai_api_key |
| | :param CTA: call to action for the messages |
| | :param segment_info: common information about the users |
| | :param message_style: style of the message |
| | :param sample_example: a sample for one shot prompting |
| | :return: |
| | """ |
| |
|
| | |
| | users = self.identify_users(users_df=users, identifier_column=identifier_column) |
| |
|
| | personalize_message = CoreConfig(session=session, |
| | users_df=users, |
| | brand=brand, |
| | platform=platform, |
| | config_file=config_file) |
| |
|
| | personalize_message.set_openai_api(openai_api_key) |
| | personalize_message.set_segment_name(segment_name=segment_name) |
| | personalize_message.set_number_of_messages(number_of_messages=number_of_messages, |
| | instructionset=instructionset, |
| | subsequent_examples=subsequent_examples) |
| |
|
| |
|
| | if sample_example is not None: |
| | personalize_message.set_sample_example(sample_example) |
| |
|
| | if number_of_samples is not None: |
| | personalize_message.set_number_of_samples(number_of_samples) |
| |
|
| | if model is not None: |
| | personalize_message.set_llm_model(model) |
| |
|
| | if segment_info is not None: |
| | personalize_message.set_segment_info(segment_info) |
| |
|
| | if personalization: |
| | personalize_message.set_personalization() |
| |
|
| | if involve_recsys_result: |
| | personalize_message.set_messaging_mode("recsys_result") |
| | personalize_message.set_involve_recsys_result(involve_recsys_result) |
| |
|
| | |
| | |
| |
|
| | if recsys_contents: |
| | personalize_message.set_recsys_contents(recsys_contents) |
| |
|
| | users_df = self._create_personalized_message(CoreConfig=personalize_message, progress_callback=progress_callback) |
| |
|
| | if users_df is None: |
| | return None |
| |
|
| | total_prompt_tokens = personalize_message.total_tokens["prompt_tokens"] |
| | total_completion_tokens = personalize_message.total_tokens["completion_tokens"] |
| |
|
| | total_cost = self.calculate_cost(total_prompt_tokens, total_completion_tokens, model) |
| |
|
| | print(f"Estimated Cost (USD): {total_cost:.5f} ---> Number of messages: {(len(users_df) * number_of_messages)}") |
| | st.write(f"Estimated Cost (USD): {total_cost:.5f} ---> Number of messages: {(len(users_df) * number_of_messages)}") |
| |
|
| | scale_price = (total_cost * 1000) / (len(users_df) * number_of_messages) |
| | print(f"Estimated Cost (USD) for 1000 messages: {scale_price}") |
| | st.write(f"Estimated Cost (USD) for 1000 messages: {scale_price}") |
| |
|
| |
|
| | |
| | |
| | |
| | |
| | |
| |
|
| | return users_df |
| |
|
| | |
| | def calculate_cost(self, total_prompt_tokens, total_completion_tokens, model): |
| | input_price, output_price = self.get_model_price(model) |
| |
|
| | total_cost = ((total_prompt_tokens / 1000000) * input_price) + ( |
| | (total_completion_tokens / 1000000) * output_price) |
| |
|
| | return total_cost |
| |
|
| | |
| | def get_model_price(self, model): |
| | """ |
| | getting the input price and output price per 1m token for the requested model |
| | :param model: |
| | :return: |
| | """ |
| |
|
| | input_prices = { |
| | "gpt-4o-mini":0.15, |
| | "gpt-4.1-mini":0.4, |
| | "gpt-5-mini": 0.25, |
| | "gpt-5-nano": 0.05, |
| | "gemini-2.5-flash":0.3, |
| | "gemini-2.0-flash":0.1, |
| | "gemini-2.5-flash-lite":0.1, |
| | "claude-3-5-haiku-latest":0.8, |
| | "google/gemma-3-27b-instruct/bf-16": 0.15 |
| | } |
| |
|
| | out_prices = { |
| | "gpt-4o-mini":0.6, |
| | "gpt-4.1-mini":1.6, |
| | "gpt-5-mini": 2, |
| | "gpt-5-nano": 0.4, |
| | "gemini-2.5-flash":2.5, |
| | "gemini-2.0-flash":0.7, |
| | "gemini-2.5-flash-lite":0.4, |
| | "claude-3-5-haiku-latest":3, |
| | "google/gemma-3-27b-instruct/bf-16": 0.3 |
| | } |
| |
|
| | i_price = input_prices.get(model, 0) |
| | o_price= out_prices.get(model, 0) |
| |
|
| | return i_price, o_price |
| |
|
| | |
| | def identify_users(self, users_df, identifier_column): |
| | """ |
| | specifying the users for identification |
| | :param identifier_column: |
| | :return: updated users |
| | """ |
| |
|
| | if identifier_column.upper() == "EMAIL": |
| | return users_df |
| | else: |
| | users_df.rename(columns={identifier_column: "USER_ID"}, inplace=True) |
| | return users_df |
| |
|
| | |
| | def _create_personalized_message(self, CoreConfig, progress_callback): |
| | """ |
| | main function of the class to flow the work between functions inorder to create personalized messages. |
| | :return: updated users_df with extracted information and personalize messages. |
| | """ |
| | |
| | datacollect = DataCollector(CoreConfig) |
| | CoreConfig = datacollect.gather_data() |
| |
|
| | if len(CoreConfig.users_df) == 0: |
| | print("There is no user to generate messages") |
| | return None |
| |
|
| | |
| | if CoreConfig.involve_recsys_result and CoreConfig.messaging_mode != "message": |
| | Recommender = LLMR(CoreConfig, random=True) |
| | CoreConfig = Recommender.get_recommendations(progress_callback) |
| |
|
| | else: |
| | |
| | Recommender = DefaultRec(CoreConfig) |
| | CoreConfig = Recommender.get_recommendations() |
| |
|
| | |
| | prompt = PromptGenerator(CoreConfig) |
| | CoreConfig = prompt.generate_prompts() |
| |
|
| | |
| | message_generator = MessageGenerator(CoreConfig) |
| | CoreConfig = message_generator.generate_messages(progress_callback) |
| |
|
| | |
| | CoreConfig.users_df = CoreConfig.users_df[CoreConfig.users_df["message"].str.strip().astype(bool)] |
| | CoreConfig.checkpoint() |
| |
|
| | |
| | |
| |
|
| | return CoreConfig.users_df |
| |
|
| |
|
| |
|
| |
|