""" the flow of the Program starts from create_personalized_message function """ import time from tqdm import tqdm from Messaging_system.DataCollector import DataCollector from Messaging_system.CoreConfig import CoreConfig from Messaging_system.LLMR import LLMR import streamlit as st from Messaging_system.Message_generator import MessageGenerator from Messaging_system.PromptGenerator import PromptGenerator from Messaging_system.SnowFlakeConnection import SnowFlakeConn from Messaging_system.Homepage_Recommender import DefaultRec class Permes: """ LLM-based personalized message generator: """ def create_personalize_messages(self, session, users, brand, config_file, openai_api_key, platform="push", number_of_messages=1, instructionset=None, subsequent_examples=None , recsys_contents=None, model=None, identifier_column="user_id", segment_info=None, sample_example=None, number_of_samples=None, involve_recsys_result=False, messaging_mode="message", ongoing_df=None, personalization=False, progress_callback=None, segment_name="no_recent_activity"): """ creating personalized messages for the input users given the parameters for both app and push platform. :param session: snowflake connection object :param users: users dataframe :param brand :param config_file :param openai_api_key :param CTA: call to action for the messages :param segment_info: common information about the users :param message_style: style of the message :param sample_example: a sample for one shot prompting :return: """ # primary processing users = self.identify_users(users_df=users, identifier_column=identifier_column) personalize_message = CoreConfig(session=session, users_df=users, brand=brand, platform=platform, config_file=config_file) personalize_message.set_openai_api(openai_api_key) personalize_message.set_segment_name(segment_name=segment_name) personalize_message.set_number_of_messages(number_of_messages=number_of_messages, instructionset=instructionset, subsequent_examples=subsequent_examples) if sample_example is not None: # Check if sample_example is not empty personalize_message.set_sample_example(sample_example) if number_of_samples is not None: personalize_message.set_number_of_samples(number_of_samples) if model is not None: personalize_message.set_llm_model(model) if segment_info is not None: personalize_message.set_segment_info(segment_info) if personalization: personalize_message.set_personalization() if involve_recsys_result: personalize_message.set_messaging_mode("recsys_result") personalize_message.set_involve_recsys_result(involve_recsys_result) # if messaging_mode != "message": # personalize_message.set_messaging_mode(messaging_mode) if recsys_contents: personalize_message.set_recsys_contents(recsys_contents) users_df = self._create_personalized_message(CoreConfig=personalize_message, progress_callback=progress_callback) if users_df is None: return None total_prompt_tokens = personalize_message.total_tokens["prompt_tokens"] total_completion_tokens = personalize_message.total_tokens["completion_tokens"] total_cost = self.calculate_cost(total_prompt_tokens, total_completion_tokens, model) print(f"Estimated Cost (USD): {total_cost:.5f} ---> Number of messages: {(len(users_df) * number_of_messages)}") st.write(f"Estimated Cost (USD): {total_cost:.5f} ---> Number of messages: {(len(users_df) * number_of_messages)}") scale_price = (total_cost * 1000) / (len(users_df) * number_of_messages) print(f"Estimated Cost (USD) for 1000 messages: {scale_price}") st.write(f"Estimated Cost (USD) for 1000 messages: {scale_price}") # Storing process can also happen after some evaluation steps # snowflake_conn = SnowFlakeConn(session=session, brand=brand) # query = snowflake_conn.generate_write_sql_query(table_name="AI_generated_messages", dataframe=users_df) # snowflake_conn.run_write_query(query=query, table_name="AI_generated_messages", dataframe=users_df) # snowflake_conn.close_connection() return users_df # ----------------------------------------------------- def calculate_cost(self, total_prompt_tokens, total_completion_tokens, model): input_price, output_price = self.get_model_price(model) total_cost = ((total_prompt_tokens / 1000000) * input_price) + ( (total_completion_tokens / 1000000) * output_price) # Cost calculation estimation return total_cost # ==================================================== def get_model_price(self, model): """ getting the input price and output price per 1m token for the requested model :param model: :return: """ input_prices = { "gpt-4o-mini":0.15, "gpt-4.1-mini":0.4, "gpt-5-mini": 0.25, "gpt-5-nano": 0.05, "gemini-2.5-flash":0.3, "gemini-2.0-flash":0.1, "gemini-2.5-flash-lite":0.1, "claude-3-5-haiku-latest":0.8, "google/gemma-3-27b-instruct/bf-16": 0.15 } out_prices = { "gpt-4o-mini":0.6, "gpt-4.1-mini":1.6, "gpt-5-mini": 2, "gpt-5-nano": 0.4, "gemini-2.5-flash":2.5, "gemini-2.0-flash":0.7, "gemini-2.5-flash-lite":0.4, "claude-3-5-haiku-latest":3, "google/gemma-3-27b-instruct/bf-16": 0.3 } i_price = input_prices.get(model, 0) o_price= out_prices.get(model, 0) return i_price, o_price # ===================================================== def identify_users(self, users_df, identifier_column): """ specifying the users for identification :param identifier_column: :return: updated users """ if identifier_column.upper() == "EMAIL": return users_df else: users_df.rename(columns={identifier_column: "USER_ID"}, inplace=True) return users_df # ------------------------------------------------------------------ def _create_personalized_message(self, CoreConfig, progress_callback): """ main function of the class to flow the work between functions inorder to create personalized messages. :return: updated users_df with extracted information and personalize messages. """ # Collecting all the data that we need to personalize messages datacollect = DataCollector(CoreConfig) CoreConfig = datacollect.gather_data() if len(CoreConfig.users_df) == 0: print("There is no user to generate messages") return None # generating recommendations for users, if we want to include recommendations in the message if CoreConfig.involve_recsys_result and CoreConfig.messaging_mode != "message": Recommender = LLMR(CoreConfig, random=True) CoreConfig = Recommender.get_recommendations(progress_callback) else: # We only want to generate the message and redirect them to For You section or Homepage Recommender = DefaultRec(CoreConfig) CoreConfig = Recommender.get_recommendations() # generating proper prompt for each user prompt = PromptGenerator(CoreConfig) CoreConfig = prompt.generate_prompts() # generating messages for each user message_generator = MessageGenerator(CoreConfig) CoreConfig = message_generator.generate_messages(progress_callback) # Eliminating rows where we don't have a valid message (null, empty, or whitespace only) CoreConfig.users_df = CoreConfig.users_df[CoreConfig.users_df["message"].str.strip().astype(bool)] CoreConfig.checkpoint() # closing snowflake connection # CoreConfig.session.close() return CoreConfig.users_df