Spaces:
Sleeping
Sleeping
| import os | |
| import pandas as pd | |
| from .state import State , ValidationFormatter , CriticResponseFormatter | |
| from .tools import Retrieval | |
| from langgraph.prebuilt import create_react_agent | |
| from src.genai.utils.models_loader import ideator_llm, critic_llm , normalizer_llm , validator_llm , judge1_llm , judge2_llm , simplifier_llm , moderator_llm | |
| from langchain_core.messages import SystemMessage , HumanMessage, FunctionMessage | |
| from .prompts import ideator_prompt ,critic_prompt, moderator_prompt , validator_prompt, judge_prompt, simplifier_prompt | |
| from .schemas import ideation_json_schema , judge_response_json_schema | |
| class RetrieverNode: | |
| def __init__(self): | |
| pass | |
| def run(self , state:State): | |
| influencers_data = 'Nothing.' | |
| # influencers_data = Retrieval(state.business_details[-1]).influencers_data() | |
| state.influencers_data.append(influencers_data) | |
| print('Retriever Node completed...') | |
| # imdb_data = Retrieval(state.business_details[-1]).imdb_ideas() | |
| # state.imdb_data.append(imdb_data) | |
| return state | |
| class IdeatorNode: | |
| def __init__(self): | |
| self.llm = ideator_llm | |
| def run(self, state:State): | |
| template = ideator_prompt() | |
| messages = [SystemMessage(content=template), | |
| HumanMessage(content=f'''The business_details is\n{state.business_details[-1]}\n | |
| The information of the image is:\n{state.image_caption[-1]}'''),] | |
| # FunctionMessage(name='imdb_ideas_function', content=f'''The data of imdb movies description is:\n {state.imdb_data[-1]}\n''')] | |
| response = self.llm.invoke(messages) | |
| print('Ideator Response:', response.content) | |
| print('The scores are:',state.scores[-1]) | |
| state.ideator_response.append(str(response.content)) | |
| return state | |
| class ModeratorNode: | |
| def __init__(self): | |
| self.llm = moderator_llm | |
| def run(self, state:State): | |
| template = moderator_prompt() | |
| messages = [SystemMessage(content=template), | |
| HumanMessage(content=f'''The ideas generated by ideator are:\n{state.ideator_response[-1]}\n''' ), | |
| FunctionMessage(name='moderator',content=f'''The scores are: \n {str(state.scores[-1])}''')] | |
| response = self.llm.invoke(messages) | |
| state.moderator_response.append(str(response.content)) | |
| print('Moderator Response:', state.moderator_response[-1]) | |
| return state | |
| class SimplifierNode: | |
| def __init__(self): | |
| self.llm = simplifier_llm | |
| def run(self, state:State): | |
| template = simplifier_prompt() | |
| messages = [SystemMessage(content=template), HumanMessage(content=f'''The ideas generated by ideator are:\n{state.moderator_response[-1]}\n''')] | |
| response = self.llm.invoke(messages) | |
| print('Simplifier Response:', response.content) | |
| state.simplifier_response.append(str(response.content)) | |
| df = pd.read_csv('src/genai/utils/ideas/ideas.csv') | |
| df = pd.concat([df, pd.DataFrame({ | |
| 'BusinessDetails': [state.business_details[-1]], | |
| 'Ideas': [state.simplifier_response[-1]] | |
| })], ignore_index=True) | |
| df.to_csv('src/genai/utils/ideas/ideas.csv') | |
| print('Ideator Node executed') | |
| return state | |
| class CriticNode: | |
| def __init__(self): | |
| self.llm = critic_llm | |
| def run(self,state:State): | |
| template = critic_prompt() | |
| messages = [SystemMessage(content=template), | |
| HumanMessage(content=f'''The ideas generated by ideator are:\n{state.ideator_response[-1]}\n. | |
| The business_details is\n{state.business_details[-1]}\n | |
| The information of the image is:\n{state.image_caption[-1]}'''),] | |
| # FunctionMessage(name='imdb_ideas_function', content=f'''The data of imdb movies description is:\n {state.imdb_data[-1]}\n''')] | |
| response = self.llm.invoke(messages) | |
| state.critic_response.append(str(response.content)) | |
| print('Critic Response:', response.content) | |
| print('Critic Node executed') | |
| return state | |
| class NormalizerNode: | |
| def __init__(self): | |
| self.llm = normalizer_llm | |
| def run(self, state:State): | |
| response = self.llm.with_structured_output(ideation_json_schema).invoke(str(state.simplifier_response[-1])) | |
| state.normalizer_response.append(response) | |
| print('Normalizer Executed') | |
| return state | |
| class Judge: | |
| def __init__(self, llm): | |
| self.llm = llm | |
| def run (self, state:State): | |
| template = judge_prompt(state) | |
| messages = [SystemMessage(content=template), | |
| HumanMessage(content=f'''The generated 10 ideas are:\n{state.normalizer_response[-1]}\n. | |
| The business_details is\n{state.business_details[-1]}\n | |
| The information of image is:{state.image_caption[-1]}\n''')] | |
| response = self.llm.with_structured_output(judge_response_json_schema).invoke(messages) | |
| return response | |
| class JudgeNode1: | |
| def __init__(self): | |
| self.llm = judge1_llm | |
| def run (self, state:State): | |
| response = Judge(self.llm).run(state) | |
| return {'judge1_response':[response]} | |
| class JudgeNode2: | |
| def __init__(self): | |
| self.llm = judge2_llm | |
| def run(self, state:State): | |
| response = Judge(self.llm).run(state) | |
| return {'judge2_response':[response]} | |
| class Aggregrator: | |
| def __init__(self): | |
| self.unique_ideas = {} | |
| def run(self, state: State): | |
| # Combine ideas from both judges | |
| all_selected_ideas = [ | |
| *state.judge1_response[-1]['selected_ideas'], | |
| *state.judge2_response[-1]['selected_ideas'] | |
| ] | |
| print('All selected ideas:', all_selected_ideas) | |
| # Keep only unique ideas by title | |
| for idea in all_selected_ideas: | |
| title = idea['title'] | |
| # If title not already added, store it | |
| if title not in self.unique_ideas: | |
| self.unique_ideas[title] = idea | |
| # Convert to list | |
| unique_ideas_list = list(self.unique_ideas.values()) | |
| # Save unique ideas to state | |
| state.unique_selected_ideas.append(unique_ideas_list) | |
| return state | |
| class ValidatorNode: | |
| def __init__(self): | |
| self.validator_llm1 = validator_llm | |
| self.validator_llm2 = validator_llm | |
| def get_response(self,state, validator_llm): | |
| template = validator_prompt(state) | |
| messages = [SystemMessage(content=template), | |
| HumanMessage(content=f'''The business_details is:\n{state.business_details[-1]}''')] | |
| response = validator_llm.with_structured_output(ValidationFormatter).invoke(messages) | |
| return response | |
| def run(self, state:State): | |
| response = self.get_response(state,self.validator_llm1) | |
| state.validator_response.append(response.result) | |
| if 'not validated' in response.result: state.disagreement_reason.append(response.reason) | |
| return state | |
| class RoutingAfterValidation: | |
| def __init__(self): | |
| pass | |
| def route(self, state:State): | |
| return 'not validated' not in state.validator_response[-1] | |