| | """ |
| | This class is a LLM based recommender that can choose the perfect content for the user given user profile and our goal |
| | |
| | """ |
| | import json |
| | import os |
| | import random |
| |
|
| | import pandas as pd |
| | import openai |
| | from openai import OpenAI |
| | from dotenv import load_dotenv |
| | import time |
| | import streamlit as st |
| | from tqdm import tqdm |
| | from Messaging_system.Homepage_Recommender import DefaultRec |
| | load_dotenv() |
| |
|
| |
|
| | |
| | class LLMR: |
| |
|
| | def __init__(self, CoreConfig, random=False): |
| |
|
| | self.Core = CoreConfig |
| | self.user = None |
| | self.selected_content_ids = [] |
| | self.random=random |
| |
|
| | def get_recommendations(self, progress_callback): |
| | """ |
| | selecting the recommended content for each user |
| | :return: |
| | """ |
| | default = DefaultRec(self.Core) |
| |
|
| | self.Core.users_df["recommendation"] = None |
| | self.Core.users_df["recommendation_info"] = None |
| | total_users = len(self.Core.users_df) |
| |
|
| | st.write("Choosing the best content to recommend ... ") |
| |
|
| | self.Core.start_time = time.time() |
| | for progress, (idx, row) in enumerate( |
| | tqdm(self.Core.users_df.iterrows(), desc="Selecting the best content to recommend ...")): |
| | |
| | |
| | if progress_callback is not None: |
| | progress_callback(progress, total_users) |
| |
|
| | self.user = row |
| | recommendation_dict, content_info, recsys_json, token = self._get_recommendation() |
| |
|
| | if recommendation_dict["content_id"] is None: |
| | self.Core.users_df.at[idx, "recommendation"] = default.recommendation |
| | self.Core.users_df.at[idx, "recommendation_info"] = default.recommendation_info |
| | self.Core.users_df.at[idx, "recsys_result"] = default.for_you_url |
| |
|
| |
|
| | else: |
| | |
| | self.Core.total_tokens['prompt_tokens'] += int(token['prompt_tokens']) |
| | self.Core.total_tokens['completion_tokens'] += int(token['completion_tokens']) |
| | self.Core.temp_token_counter = int(token['prompt_tokens']) + int(token['completion_tokens']) |
| | self.Core.users_df.at[idx, "recommendation"] = recommendation_dict |
| | self.Core.users_df.at[idx, "recommendation_info"] = content_info |
| | self.Core.users_df.at[idx, "recsys_result"] = recsys_json |
| | self.Core.respect_request_ratio() |
| |
|
| | return self.Core |
| |
|
| | |
| | |
| | def _get_recommendation(self): |
| | """ |
| | select and return the recommendation from the available list of contents |
| | :return: content_id |
| | """ |
| |
|
| | if self.random: |
| | return self._get_recommendation_random() |
| |
|
| | prompt, recsys_json = self._generate_prompt() |
| | if prompt is None: |
| | return None, None, None, None |
| |
|
| | else: |
| | content_id, tokens = self.get_llm_response(prompt) |
| | if content_id == 0: |
| | |
| | return None, None, None, None |
| | else: |
| | content_info = self._get_content_info(content_id) |
| | recsys_data = json.loads(recsys_json) |
| | recommendation_dict = self._get_recommendation_info(content_id, recsys_data) |
| | return recommendation_dict, content_info, recsys_json, tokens |
| |
|
| | |
| | |
| |
|
| | def _generate_prompt(self): |
| | """ |
| | Generates the prompts for given user in order to choose the recommendation from the available list |
| | :param user: |
| | :return: |
| | """ |
| | available_contents, recsys_json = self._get_available_contents() |
| | if available_contents.strip() == "": |
| | return None |
| |
|
| | |
| | input_context = self._input_context() |
| | user_info = self._get_user_profile() |
| | task = self._task_instructions() |
| | output_instruction = self._output_instruction() |
| |
|
| | prompt = f""" |
| | ### Context: |
| | {input_context} |
| | |
| | ### User Information: |
| | {user_info} |
| | |
| | ### Available Contents: |
| | {available_contents} |
| | |
| | ### Main Task: |
| | {task} |
| | |
| | ### Output Instructions: |
| | {output_instruction} |
| | """ |
| |
|
| | return prompt, recsys_json |
| |
|
| | |
| | |
| | def _input_context(self): |
| | """ |
| | :return: input instructions as a string |
| | """ |
| |
|
| | context = f""" |
| | You are a helpful educational music content recommender. Your goal is to choose a perfect content to recommend to the user given the information that we have from the user and available contents to recommend. |
| | """ |
| |
|
| | return context |
| |
|
| | |
| | |
| | def _system_instructions(self): |
| | """ |
| | (Optional) A helper function that defines high-level system context for certain LLMs. |
| | For example, if your LLM endpoint supports messages in the form of role='system'. |
| | """ |
| | context = f""" |
| | You are a helpful educational music content recommender |
| | """ |
| |
|
| | return context |
| |
|
| | |
| | |
| | def _task_instructions(self): |
| | """ |
| | creating the instructions about the task |
| | :return: task |
| | """ |
| |
|
| | task = """ |
| | - You must select exactly ONE content from the 'Available Contents' to recommend. |
| | - Base your decision on the User information and focus on providing the most relevant recommendation. |
| | - Do not recommended content where the topic is focused on a specific Gear (e.g. YAMAHA) |
| | - Provide the content_id of the recommended content in the output based on Output instructions. |
| | """ |
| |
|
| | return task |
| |
|
| | |
| | |
| | def _get_user_profile(self): |
| | """ |
| | getting user's goal and user's last completed content to use for choosing the recommended content |
| | :return: |
| | """ |
| |
|
| | last_completed_content = self._get_user_data(attribute="last_completed_content") |
| | user_info = self._get_user_data(attribute="user_info") |
| |
|
| | recommendation_info = f""" |
| | **User information and preferences:** |
| | |
| | {user_info} |
| | |
| | **Previous completed content:** |
| | {last_completed_content} |
| | """ |
| |
|
| | return recommendation_info |
| |
|
| | |
| | |
| | def _get_user_data(self, attribute): |
| | """ |
| | get user's information for the requested attribute |
| | :param user: |
| | :return: user_info |
| | """ |
| |
|
| | |
| | if pd.notna(self.user[attribute]) and self.user[attribute] not in [ |
| | None, [], {}] and (not isinstance(self.user[attribute], str) or self.user[attribute].strip()): |
| | user_info = self.user[attribute] |
| | else: |
| | user_info = "Not Available" |
| |
|
| | return user_info |
| |
|
| | |
| | |
| |
|
| | def _get_user_recommendation(self): |
| |
|
| | recsys_json = self.user["recsys_result"] |
| |
|
| | try: |
| | recsys_data = json.loads(recsys_json) |
| | |
| | sections = self.Core.recsys_contents |
| |
|
| | |
| | if not any(section in recsys_data for section in sections): |
| | popular_content = self.Core.popular_contents_df.iloc[0][f"popular_content"] |
| | return popular_content |
| | else: |
| | return recsys_json |
| | except: |
| | popular_content = self.Core.popular_contents_df.iloc[0][f"popular_content"] |
| | return popular_content |
| |
|
| |
|
| | |
| | |
| | def _get_available_contents(self): |
| |
|
| | |
| | recsys_json = self._get_user_recommendation() |
| | recsys_data = json.loads(recsys_json) |
| |
|
| | |
| | sections = self.Core.recsys_contents |
| |
|
| | |
| | selected_content_ids = [] |
| |
|
| | for section in sections: |
| | if section in recsys_data: |
| | |
| | recs = recsys_data[section] |
| | |
| | recs_sorted = sorted(recs, key=lambda x: x['recommendation_rank']) |
| | |
| | top_recs = recs_sorted[:3] |
| | |
| | content_ids = [rec['content_id'] for rec in top_recs] |
| | |
| | selected_content_ids.extend(content_ids) |
| | |
| | content_info_rows = self.Core.content_info[self.Core.content_info['content_id'].isin(selected_content_ids)] |
| |
|
| | |
| | content_info_map = dict(zip(content_info_rows['content_id'], content_info_rows['content_info'])) |
| |
|
| | |
| | lines = [] |
| | for content_id in selected_content_ids: |
| | |
| | content_info = content_info_map.get(content_id, "No content info found") |
| |
|
| | |
| | lines.append(f"**content_id**: {content_id}") |
| | lines.append("**content_info**:") |
| | lines.append(content_info) |
| | lines.append("") |
| |
|
| | |
| | text = "\n".join(lines) |
| |
|
| | self.selected_content_ids = selected_content_ids |
| |
|
| | return text, recsys_json |
| |
|
| | |
| | |
| |
|
| | def _get_content_info(self, content_id): |
| | """ |
| | getting content_info for the recommended content |
| | :param content_id: |
| | :return: |
| | """ |
| |
|
| | content_info_row = self.Core.content_info[self.Core.content_info['content_id'] == content_id] |
| | content_info = content_info_row['content_info'].iloc[0] |
| |
|
| | return content_info |
| |
|
| | |
| | |
| | def is_valid_content_id(self, content_id): |
| | """ |
| | check if the llm respond is a valid content_id |
| | :param content_id: |
| | :return: |
| | """ |
| |
|
| | if content_id in self.selected_content_ids: |
| | return True |
| | else: |
| | return False |
| |
|
| | |
| | |
| | def _output_instruction(self): |
| | """ |
| | :return: output instructions as a string |
| | """ |
| |
|
| | instructions = f""" |
| | Return the content_id of the final recommendation in **JSON** format with the following structure: |
| | |
| | {{ |
| | "content_id": "content_id of the recommended content from Available Contents, as an integer", |
| | }} |
| | |
| | Do not include any additional keys or text outside the JSON. |
| | """ |
| |
|
| | return instructions |
| |
|
| | def get_llm_response(self, prompt, max_retries=4): |
| | """ |
| | sending the prompt to the LLM and get back the response |
| | """ |
| |
|
| | openai.api_key = self.Core.api_key |
| | instructions = self._system_instructions() |
| | client = OpenAI(api_key=self.Core.api_key) |
| |
|
| | for attempt in range(max_retries): |
| | try: |
| | response = client.chat.completions.create( |
| | model=self.Core.model, |
| | response_format={"type": "json_object"}, |
| | messages=[ |
| | {"role": "system", "content": instructions}, |
| | {"role": "user", "content": prompt} |
| | ], |
| | max_tokens=20, |
| | n=1, |
| | temperature=0.7 |
| | ) |
| |
|
| | tokens = { |
| | 'prompt_tokens': response.usage.prompt_tokens, |
| | 'completion_tokens': response.usage.completion_tokens, |
| | 'total_tokens': response.usage.total_tokens |
| | } |
| |
|
| | try: |
| | content = response.choices[0].message.content |
| |
|
| | |
| |
|
| | output = json.loads(content) |
| |
|
| | if 'content_id' in output and self.is_valid_content_id(int(output['content_id'])): |
| | return int(output['content_id']), tokens |
| |
|
| | else: |
| | print(f"'content_id' missing or invalid in response on attempt {attempt + 1}. Retrying...") |
| | continue |
| |
|
| | except json.JSONDecodeError: |
| | print(f"Invalid JSON from LLM on attempt {attempt + 1}. Retrying...") |
| |
|
| | except openai.APIConnectionError as e: |
| | print("The server could not be reached") |
| | print(e.__cause__) |
| | except openai.RateLimitError as e: |
| | print("A 429 status code was received; we should back off a bit.") |
| | except openai.APIStatusError as e: |
| | print("Another non-200-range status code was received") |
| | print(e.status_code) |
| | print(e.response) |
| |
|
| | print("Max retries exceeded. Returning empty response.") |
| | return 0, 0 |
| |
|
| |
|
| | |
| | |
| | |
| | def _get_recommendation_random(self): |
| | """ |
| | Randomly pick ONE valid item from the top-5 of each requested section. |
| | If the first random pick is missing/invalid, keep trying other candidates. |
| | Also remove the picked item from every section in recsys_json. |
| | Returns: (recommendation_dict, content_info, updated_recsys_json, zero_tokens_dict) |
| | """ |
| | import json, random |
| |
|
| | |
| | recsys_json = self._get_user_recommendation() |
| | try: |
| | recsys_data = json.loads(recsys_json) if recsys_json else {} |
| | except Exception: |
| | recsys_data = {} |
| |
|
| | sections = self.Core.recsys_contents |
| |
|
| | |
| | unique_candidates = self.build_unique_candidates(recsys_data, sections) |
| |
|
| | |
| | used_popular_fallback = False |
| | if not unique_candidates: |
| | recsys_data = self._get_popular_fallback_json(k=5) |
| | unique_candidates = self.build_unique_candidates(recsys_data, sections) |
| | used_popular_fallback = True |
| |
|
| | |
| | if not unique_candidates: |
| | return None, None, None, None |
| |
|
| | |
| | idxs = list(range(len(unique_candidates))) |
| | random.shuffle(idxs) |
| |
|
| | picked_id, recommendation_dict, content_info = self.try_pick_from_candidates(idxs, unique_candidates, |
| | recsys_data) |
| |
|
| | |
| | if picked_id is None and not used_popular_fallback: |
| | recsys_data = self._get_popular_fallback_json(k=5) |
| | unique_candidates = self.build_unique_candidates(recsys_data, sections) |
| | if unique_candidates: |
| | idxs = list(range(len(unique_candidates))) |
| | random.shuffle(idxs) |
| | picked_id, recommendation_dict, content_info = self.try_pick_from_candidates(idxs, unique_candidates, |
| | recsys_data) |
| |
|
| | |
| | if picked_id is None: |
| | return None, None, None, None |
| |
|
| | |
| | recsys_data = self._remove_selected_from_all(recsys_data, picked_id) |
| |
|
| | |
| | self.selected_content_ids = [r["content_id"] for r in unique_candidates if r.get("content_id")] |
| |
|
| | |
| | updated_json = json.dumps(recsys_data) |
| | zero_tokens = {"prompt_tokens": 0, "completion_tokens": 0} |
| |
|
| | return recommendation_dict, content_info, updated_json, zero_tokens |
| |
|
| | |
| | def build_unique_candidates(self, src_data, sections): |
| | |
| | cands = self._collect_top_k(src_data, sections, k=5) |
| | seen, uniq = set(), [] |
| | for rec in cands or []: |
| | cid = rec.get("content_id") |
| | if cid and cid not in seen: |
| | seen.add(cid) |
| | uniq.append(rec) |
| | return uniq |
| |
|
| | |
| | def try_pick_from_candidates(self, idxs, candidates, source_data): |
| | """ |
| | Iterate candidates in random order, returning the first valid pick: |
| | (picked_id, recommendation_dict, content_info) or (None, None, None) |
| | """ |
| | banned_contents = set(self.Core.config_file.get("banned_contents", [])) |
| |
|
| | for i in idxs: |
| | rec = candidates[i] |
| | picked_id = rec.get("content_id") |
| | if not picked_id: |
| | continue |
| | |
| | if picked_id in banned_contents: |
| | continue |
| | try: |
| | |
| | content_info = self._get_content_info(picked_id) |
| | if not content_info: |
| | |
| | continue |
| |
|
| | recommendation_dict = self._get_recommendation_info(picked_id, source_data) |
| | |
| | return picked_id, recommendation_dict, content_info |
| |
|
| | except IndexError: |
| | |
| | continue |
| | except KeyError: |
| | continue |
| | except Exception: |
| | |
| | continue |
| | return None, None, None |
| | |
| | |
| | |
| | def _get_recommendation_info(self, content_id, recsys_data): |
| | |
| | found_item=None |
| | for category, items in recsys_data.items(): |
| | for item in items: |
| | if item.get("content_id") == content_id: |
| | found_item = item |
| | break |
| | if found_item: |
| | break |
| |
|
| | if found_item is None: |
| | print(f"content_id {content_id} not found in recsys_data") |
| | return None |
| |
|
| | |
| | web_url_path = found_item.get("web_url_path") |
| | title = found_item.get("title") |
| | thumbnail_url = found_item.get("thumbnail_url") |
| |
|
| | |
| | recommendation_dict = { |
| | "content_id": content_id, |
| | "web_url_path": web_url_path, |
| | "title": title, |
| | "thumbnail_url": thumbnail_url |
| | } |
| |
|
| | return recommendation_dict |
| | |
| | def _collect_top_k(self, recsys_data: dict, sections, k: int = 5): |
| | """ |
| | From each section, grab top-k items by recommendation_rank (ascending). |
| | Returns a flat list of rec dicts. |
| | """ |
| | out = [] |
| | for sec in sections: |
| | recs = recsys_data.get(sec, []) |
| | if not isinstance(recs, list): |
| | continue |
| | recs_sorted = sorted( |
| | recs, |
| | key=lambda x: x.get("recommendation_rank", float("inf")) |
| | ) |
| | out.extend(recs_sorted[:k]) |
| | return out |
| | |
| | def _get_popular_fallback_json(self, k: int = 5): |
| | """ |
| | Build a recsys-like dict from popular contents when user has no recsys_result. |
| | Assumes self.Core.popular_contents_df.iloc[0]['popular_content'] holds a JSON string |
| | with the same structure: {section: [{content_id, recommendation_rank, ...}, ...], ...} |
| | """ |
| | try: |
| | popular_json = self.Core.popular_contents_df.iloc[0]["popular_content"] |
| | data = json.loads(popular_json) |
| | except Exception: |
| | return {} |
| |
|
| | sections = self.Core.recsys_contents |
| | out = {} |
| | for sec in sections: |
| | recs = data.get(sec, []) |
| | recs_sorted = sorted( |
| | recs, |
| | key=lambda x: x.get("recommendation_rank", float("inf")) |
| | ) |
| | out[sec] = recs_sorted[:k] |
| | return out |
| | |
| | def _remove_selected_from_all(self, recsys_data: dict, content_id): |
| | """ |
| | Remove the chosen content_id from every section so it won't be recommended again. |
| | """ |
| | for sec, recs in list(recsys_data.items()): |
| | if isinstance(recs, list): |
| | recsys_data[sec] = [r for r in recs if r.get("content_id") != content_id] |
| | return recsys_data |
| |
|
| |
|