""" This is the source code for the webscraper agents that use ChainLit Features: - Uses top N google search based on a keyword, create a JSON file, uploads it to Google Cloud Object Storage or Locally - Continuous messaging - Multithreading Written by: Antoine Ross - October 2023. """ import os from typing import Dict, Optional, Union from dotenv import load_dotenv, find_dotenv import chainlit as cl from chainlit.client.base import ConversationDict from chainlit.types import AskFileResponse from langchain.document_loaders import PyPDFLoader, TextLoader from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain.chains import ConversationalRetrievalChain import autogen from autogen import Agent, AssistantAgent, UserProxyAgent, config_list_from_json from webscrapereddit import grab_articles load_dotenv(find_dotenv()) WELCOME_MESSAGE = f"""Soap Opera Team 👾 \n\n Let's make some soap opera! What topic do you want to search? """ # Agents USER_PROXY_NAME = "Query Agent" PROOF_READER = "Proofreader" WRITER = "Writer" WRITER2 = "Writer2" PLANNER = "Planner" CHARACTER_DEVELOPER = "Character Developer" DIALOGUE_SPECIALIST = "Dialogue Specialist" ARTICLES = None text_splitter = RecursiveCharacterTextSplitter(chunk_size=8192, chunk_overlap=100) def load_article(file_path): try: with open(file_path, 'r') as file: article = file.read() return article except FileNotFoundError: print("File not found") return None # Function to process the file def process_file(file: AskFileResponse): import tempfile if file.type == "text/plain": Loader = TextLoader elif file.type == "application/pdf": Loader = PyPDFLoader with tempfile.NamedTemporaryFile(mode="wb", delete=False) as tempfile: if file.type == "text/plain": tempfile.write(file.content) elif file.type == "application/pdf": with open(tempfile.name, "wb") as f: f.write(file.content) loader = Loader(tempfile.name) documents = loader.load() docs = text_splitter.split_documents(documents) for i, doc in enumerate(docs): doc.metadata["source"] = f"source_{i}" cl.user_session.set("docs", docs) return docs async def ask_helper(func, **kwargs): res = await func(**kwargs).send() while not res: res = await func(**kwargs).send() return res class ChainlitAssistantAgent(AssistantAgent): """ Wrapper for AutoGens Assistant Agent """ def send( self, message: Union[Dict, str], recipient: Agent, request_reply: Optional[bool] = None, silent: Optional[bool] = False, ) -> bool: cl.run_sync( cl.Message( content=f'*Sending message to "{recipient.name}":*\n\n{message}', author=self.name, ).send() ) super(ChainlitAssistantAgent, self).send( message=message, recipient=recipient, request_reply=request_reply, silent=silent, ) class ChainlitUserProxyAgent(UserProxyAgent): """ Wrapper for AutoGens UserProxy Agent. Simplifies the UI by adding CL Actions. """ def get_human_input(self, prompt: str) -> str: if prompt.startswith( "Provide feedback to chat_manager. Press enter to skip and use auto-reply" ): res = cl.run_sync( ask_helper( cl.AskActionMessage, content="Continue or provide feedback?", actions=[ cl.Action( name="continue", value="continue", label="✅ Continue" ), cl.Action( name="feedback",value="feedback", label="💬 Provide feedback"), cl.Action( name="exit",value="exit", label="🔚 Exit Conversation" ) ], ) ) if res.get("value") == "continue": return "" if res.get("value") == "exit": return "exit" reply = cl.run_sync(ask_helper(cl.AskUserMessage, content=prompt, timeout=120)) return reply["content"].strip() def send( self, message: Union[Dict, str], recipient: Agent, request_reply: Optional[bool] = None, silent: Optional[bool] = False, ): cl.run_sync( cl.Message( content=f'*Sending message to "{recipient.name}"*:\n\n{message}', author=self.name, ).send() ) super(ChainlitUserProxyAgent, self).send( message=message, recipient=recipient, request_reply=request_reply, silent=silent, ) # @cl.oauth_callback # def oauth_callback( # provider_id: str, # token: str, # raw_user_data: Dict[str, str], # default_app_user: cl.AppUser, # ) -> Optional[cl.AppUser]: # return default_app_user # @cl.on_chat_resume # async def on_chat_resume(conversation: ConversationDict): # # Access the root messages # root_messages = [m for m in conversation["messages"] if m["parentId"] == None] # # Access the user_session # cl.user_session.get("chat_profile") # # Or just pass if you do not need to run any custom logic # pass config_list = autogen.config_list_from_dotenv( dotenv_file_path='.env', model_api_key_map={ "gpt-4-1106-preview": "OPENAI_API_KEY", }, filter_dict={ "model": { "gpt-4-1106-preview", } } ) @cl.action_callback("confirm_action") async def on_action(action: cl.Action): if action.value == "everything": content = "everything" elif action.value == "top-headlines": content = "top_headlines" else: await cl.ErrorMessage(content="Invalid action").send() return prev_msg = cl.user_session.get("url_actions") # type: cl.Message if prev_msg: await prev_msg.remove_actions() cl.user_session.set("url_actions", None) await cl.Message(content=content).send() @cl.on_chat_start async def on_chat_start(): OPENAI_API_KEY = os.getenv('OPENAI_API_KEY') try: # app_user = cl.user_session.get("user") # await cl.Message(f"Hello {app_user.username}").send() # config_list = config_list_from_json(env_or_file="OAI_CONFIG_LIST") llm_config = {"config_list": config_list, "api_key": OPENAI_API_KEY, "seed": 42, "request_timeout": 180, "retry_wait_time": 60} proof_reader = ChainlitAssistantAgent( name="Proof_Reader", llm_config=llm_config, system_message="""Proof_Reader. Review and refine the content produced by the Writer and Dialogue Specialist for grammar, style, and coherence. Provide feedback directly to them and collaborate with the Planner to ensure the content aligns with the overall plan.""" ) writer = ChainlitAssistantAgent( name="Writer", llm_config=llm_config, system_message="""Develop the main storyline and setting for the soap opera. Collaborate with the Character Developer for character integration and the Planner for aligning your ideas with the overall concept. Share your drafts with the Dialogue Specialist for dialogue development and the Proof_Reader for quality checks. Write down the final output with headings and subheadings.""" ) character_developer = ChainlitAssistantAgent( name="Character_Developer", llm_config=llm_config, system_message="""Character Developer. Design comprehensive character profiles, including backstories, motivations, and relationships. Advise the Planner to integrate characters into the storyline and with the Dialogue Specialist to ensure character-consistent dialogues.""" ) dialogue_specialist = ChainlitAssistantAgent( name="Dialogue_Specialist", llm_config=llm_config, system_message="""Dialogue Specialist. Develop dialogues that reflect each character's personality and the overall tone of the soap opera. Collaborate with the Character Developer for character insights and with the Planner to fit dialogues into the narrative structure.""" ) planner = ChainlitAssistantAgent( name="Planner", llm_config=llm_config, system_message="""Planner. Guide the overall direction of the soap opera. Generate initial ideas and oversee the development process. Regularly consult with the Writer, Character Developer, and Dialogue Specialist to ensure consistency and alignment with the chosen theme. Provide feedback and adjustments as needed. First make 20 ideas, then narrow it down to 10 ideas, then narrow it down to 5, then to 3, then to 1. Narrow it down based on "What will be the most dramatic, emotional and entertaining idea". Everytime you narrow the idea down ask the User_Proxy to agree with the idea. Finally, when the last idea is finalized, discuss with the Writer to create a general idea of how the Soap Opera should look like.""" ) user_proxy = ChainlitUserProxyAgent( name="User_Proxy", human_input_mode="ALWAYS", llm_config=llm_config, # max_consecutive_auto_reply=3, # is_termination_msg=lambda x: x.get("content", "").rstrip().endswith("TERMINATE"), code_execution_config=False, system_message="""Manager. Administrate the agents on a plan. Communicate with the proofreader to proofread the output. Communicate with the writer to analyse the content and ask for it to give a summary. Reply CONTINUE, or the reason why the task is not solved yet.""" ) cl.user_session.set(USER_PROXY_NAME, user_proxy) cl.user_session.set(PROOF_READER, proof_reader) cl.user_session.set(WRITER, writer) cl.user_session.set(PLANNER, planner) cl.user_session.set(DIALOGUE_SPECIALIST, dialogue_specialist) cl.user_session.set(CHARACTER_DEVELOPER, character_developer) # WEB-SCRAPING LOGIC URL = None URL_option = None BASE_URL_EVERYTHING = 'https://newsapi.org/v2/everything?' BASE_URL_TOP_HEADLINES = 'https://newsapi.org/v2/top-headlines?' SORTBY = 'relevancy' # relevancy, popularity, publishedAt COUNTRY = 'gb' # Country you want to use CATEGORY = None # 'business' # Options: business, entertainment, general health, science, sports, technology API_KEY = os.getenv('NEWS_API_KEY') doc = cl.Action( name="doc", value="doc", label="Document" ) no_doc = cl.Action( name="no_doc", value="no_doc", label="NoDocument" ) idea = cl.Action( name="Idea", value="Idea", label="Idea" ) no_idea = cl.Action( name="NoIdea", value="NoIdea", label="NoIdea" ) everything = cl.Action( name="everything", value="everything", label="Everything" ) top_headlines = cl.Action( name="t3op-headlines",value="top-headlines", label="TopHeadlines") business = cl.Action( name="business", value="business", label="Business" ) entertainment = cl.Action( name="entertainment", value="entertainment", label="Entertainment" ) science = cl.Action( name="science", value="science", label="Science" ) sports = cl.Action( name="sports", value="sports", label="Sports" ) technology = cl.Action( name="technology", value="technology", label="Technology" ) doc_actions = [doc, no_doc] idea_actions = [idea, no_idea] url_actions = [everything, top_headlines] category_actions = [business, entertainment, science, sports, technology] # Hi, let’s generate some storyline ideas, if you have any ideas type them if not press enter IDEA_option = cl.AskActionMessage( content="Hi, let’s generate some storyline ideas. Would you like to generate ideas from Reddit, or continue?", actions=idea_actions, ) await IDEA_option.send() IDEA_option = IDEA_option.content.split()[-1] if IDEA_option == "Idea": print("Using document...") TOPIC = None while TOPIC is None: TOPIC = await cl.AskUserMessage(content="What topic would you like to make a Soap Opera about? [Only send one keyword.]", timeout=180).send() print("Topic: ", TOPIC['content']) msg = cl.Message( content=f"Processing data from Reddit...", disable_human_feedback=True ) await msg.send() articles = grab_articles(TOPIC['content']) msg = cl.Message( content=f"Content from Reddit loaded: \n{articles}", disable_human_feedback=True ) await msg.send() else: article_path = "articles.txt" articles = load_article(article_path) print("Articles grabbed.") msg = cl.Message(content=f"Processing `{articles}`...", disable_human_feedback=True, author="User_Proxy") await msg.send() cl.user_session.set(ARTICLES, articles) print("Articles set...") msg = cl.Message(content=f"""This is the Soap Opera Team, please give instructions on how the Soap Opera should be structured and made. \nSample input: "Create a captivating soap opera scene inspired by the content of the articles. Feature characters who are entangled in a complex web of emotions, ambitions, and conflicts. Craft dialogue and actions that convey the essence of the articles, infusing drama, suspense, and emotion. Leave the audience eagerly anticipating the next twist in this gripping narrative." """, disable_human_feedback=True, author="User_Proxy") await msg.send() except Exception as e: print("Error: ", e) pass @cl.on_message async def run_conversation(message: cl.Message): #try: TASK = message.content print("Task: ", TASK) proof_reader = cl.user_session.get(PROOF_READER) user_proxy = cl.user_session.get(USER_PROXY_NAME) writer = cl.user_session.get(WRITER) writer2 = cl.user_session.get(WRITER2) planner = cl.user_session.get(PLANNER) articles = cl.user_session.get(ARTICLES) character_developer = cl.user_session.get(CHARACTER_DEVELOPER) dialogue_specialist = cl.user_session.get(DIALOGUE_SPECIALIST) groupchat = autogen.GroupChat(agents=[user_proxy, proof_reader, writer, character_developer, dialogue_specialist, planner], messages=[], max_round=50) manager = autogen.GroupChatManager(groupchat=groupchat) print("Initiated GC messages... \nGC messages length: ", len(groupchat.messages)) if len(groupchat.messages) == 0: message = f"""Access the output from {articles}. Create a Soap Opera based on the article details inside. Ensure to always check the length of the context to avoid hitting the context limit. First create 10 ideas, then 5, then 3, then 1. Finalize the ideas with the planner and make sure to follow the criteria of choosing based on: "What will be the most dramatic, emotional and entertaining idea" Do not express gratitude in responses.\n Instructions for creating Soap Opera:""" + TASK await cl.Message(content=f"""Starting agents on task of creating a Soap Opera...""").send() await cl.make_async(user_proxy.initiate_chat)( manager, message=message, ) else: await cl.make_async(user_proxy.send)( manager, message=TASK, ) # except Exception as e: # print("Error: ", e) # pass