Spaces:
Build error
Build error
| """ | |
| This is the source code for the webscraper agents that use ChainLit | |
| Features: | |
| - Uses top N google search based on a keyword, create a JSON file, uploads it to Google Cloud Object Storage or Locally | |
| - Continuous messaging | |
| - Multithreading | |
| Written by: Antoine Ross - October 2023. | |
| """ | |
| import os | |
| from typing import Dict, Optional, Union | |
| from dotenv import load_dotenv, find_dotenv | |
| import chainlit as cl | |
| from chainlit.client.base import ConversationDict | |
| from chainlit.types import AskFileResponse | |
| from langchain.document_loaders import PyPDFLoader, TextLoader | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain.chains import ConversationalRetrievalChain | |
| import autogen | |
| from autogen import Agent, AssistantAgent, UserProxyAgent, config_list_from_json | |
| from webscrapereddit import grab_articles | |
| load_dotenv(find_dotenv()) | |
| WELCOME_MESSAGE = f"""Soap Opera Team 👾 | |
| \n\n | |
| Let's make some soap opera! What topic do you want to search? | |
| """ | |
| # Agents | |
| USER_PROXY_NAME = "Query Agent" | |
| PROOF_READER = "Proofreader" | |
| WRITER = "Writer" | |
| WRITER2 = "Writer2" | |
| PLANNER = "Planner" | |
| CHARACTER_DEVELOPER = "Character Developer" | |
| DIALOGUE_SPECIALIST = "Dialogue Specialist" | |
| ARTICLES = None | |
| text_splitter = RecursiveCharacterTextSplitter(chunk_size=8192, chunk_overlap=100) | |
| def load_article(file_path): | |
| try: | |
| with open(file_path, 'r') as file: | |
| article = file.read() | |
| return article | |
| except FileNotFoundError: | |
| print("File not found") | |
| return None | |
| # Function to process the file | |
| def process_file(file: AskFileResponse): | |
| import tempfile | |
| if file.type == "text/plain": | |
| Loader = TextLoader | |
| elif file.type == "application/pdf": | |
| Loader = PyPDFLoader | |
| with tempfile.NamedTemporaryFile(mode="wb", delete=False) as tempfile: | |
| if file.type == "text/plain": | |
| tempfile.write(file.content) | |
| elif file.type == "application/pdf": | |
| with open(tempfile.name, "wb") as f: | |
| f.write(file.content) | |
| loader = Loader(tempfile.name) | |
| documents = loader.load() | |
| docs = text_splitter.split_documents(documents) | |
| for i, doc in enumerate(docs): | |
| doc.metadata["source"] = f"source_{i}" | |
| cl.user_session.set("docs", docs) | |
| return docs | |
| async def ask_helper(func, **kwargs): | |
| res = await func(**kwargs).send() | |
| while not res: | |
| res = await func(**kwargs).send() | |
| return res | |
| class ChainlitAssistantAgent(AssistantAgent): | |
| """ | |
| Wrapper for AutoGens Assistant Agent | |
| """ | |
| def send( | |
| self, | |
| message: Union[Dict, str], | |
| recipient: Agent, | |
| request_reply: Optional[bool] = None, | |
| silent: Optional[bool] = False, | |
| ) -> bool: | |
| cl.run_sync( | |
| cl.Message( | |
| content=f'*Sending message to "{recipient.name}":*\n\n{message}', | |
| author=self.name, | |
| ).send() | |
| ) | |
| super(ChainlitAssistantAgent, self).send( | |
| message=message, | |
| recipient=recipient, | |
| request_reply=request_reply, | |
| silent=silent, | |
| ) | |
| class ChainlitUserProxyAgent(UserProxyAgent): | |
| """ | |
| Wrapper for AutoGens UserProxy Agent. Simplifies the UI by adding CL Actions. | |
| """ | |
| def get_human_input(self, prompt: str) -> str: | |
| if prompt.startswith( | |
| "Provide feedback to chat_manager. Press enter to skip and use auto-reply" | |
| ): | |
| res = cl.run_sync( | |
| ask_helper( | |
| cl.AskActionMessage, | |
| content="Continue or provide feedback?", | |
| actions=[ | |
| cl.Action( name="continue", value="continue", label="✅ Continue" ), | |
| cl.Action( name="feedback",value="feedback", label="💬 Provide feedback"), | |
| cl.Action( name="exit",value="exit", label="🔚 Exit Conversation" ) | |
| ], | |
| ) | |
| ) | |
| if res.get("value") == "continue": | |
| return "" | |
| if res.get("value") == "exit": | |
| return "exit" | |
| reply = cl.run_sync(ask_helper(cl.AskUserMessage, content=prompt, timeout=120)) | |
| return reply["content"].strip() | |
| def send( | |
| self, | |
| message: Union[Dict, str], | |
| recipient: Agent, | |
| request_reply: Optional[bool] = None, | |
| silent: Optional[bool] = False, | |
| ): | |
| cl.run_sync( | |
| cl.Message( | |
| content=f'*Sending message to "{recipient.name}"*:\n\n{message}', | |
| author=self.name, | |
| ).send() | |
| ) | |
| super(ChainlitUserProxyAgent, self).send( | |
| message=message, | |
| recipient=recipient, | |
| request_reply=request_reply, | |
| silent=silent, | |
| ) | |
| # @cl.oauth_callback | |
| # def oauth_callback( | |
| # provider_id: str, | |
| # token: str, | |
| # raw_user_data: Dict[str, str], | |
| # default_app_user: cl.AppUser, | |
| # ) -> Optional[cl.AppUser]: | |
| # return default_app_user | |
| # @cl.on_chat_resume | |
| # async def on_chat_resume(conversation: ConversationDict): | |
| # # Access the root messages | |
| # root_messages = [m for m in conversation["messages"] if m["parentId"] == None] | |
| # # Access the user_session | |
| # cl.user_session.get("chat_profile") | |
| # # Or just pass if you do not need to run any custom logic | |
| # pass | |
| config_list = autogen.config_list_from_dotenv( | |
| dotenv_file_path='.env', | |
| model_api_key_map={ | |
| "gpt-4-1106-preview": "OPENAI_API_KEY", | |
| }, | |
| filter_dict={ | |
| "model": { | |
| "gpt-4-1106-preview", | |
| } | |
| } | |
| ) | |
| async def on_action(action: cl.Action): | |
| if action.value == "everything": | |
| content = "everything" | |
| elif action.value == "top-headlines": | |
| content = "top_headlines" | |
| else: | |
| await cl.ErrorMessage(content="Invalid action").send() | |
| return | |
| prev_msg = cl.user_session.get("url_actions") # type: cl.Message | |
| if prev_msg: | |
| await prev_msg.remove_actions() | |
| cl.user_session.set("url_actions", None) | |
| await cl.Message(content=content).send() | |
| async def on_chat_start(): | |
| OPENAI_API_KEY = os.getenv('OPENAI_API_KEY') | |
| try: | |
| # app_user = cl.user_session.get("user") | |
| # await cl.Message(f"Hello {app_user.username}").send() | |
| # config_list = config_list_from_json(env_or_file="OAI_CONFIG_LIST") | |
| llm_config = {"config_list": config_list, "api_key": OPENAI_API_KEY, "seed": 42, "request_timeout": 180, "retry_wait_time": 60} | |
| proof_reader = ChainlitAssistantAgent( | |
| name="Proof_Reader", llm_config=llm_config, | |
| system_message="""Proof_Reader. Review and refine the content produced by the Writer and | |
| Dialogue Specialist for grammar, style, and coherence. Provide feedback directly to them | |
| and collaborate with the Planner to ensure the content aligns with the overall plan.""" | |
| ) | |
| writer = ChainlitAssistantAgent( | |
| name="Writer", llm_config=llm_config, | |
| system_message="""Develop the main storyline and setting for the soap opera. Collaborate | |
| with the Character Developer for character integration and the Planner for aligning your | |
| ideas with the overall concept. Share your drafts with the Dialogue Specialist for dialogue | |
| development and the Proof_Reader for quality checks. Write down the final output with headings and subheadings.""" | |
| ) | |
| character_developer = ChainlitAssistantAgent( | |
| name="Character_Developer", llm_config=llm_config, | |
| system_message="""Character Developer. Design comprehensive character profiles, including backstories, | |
| motivations, and relationships. Advise the Planner to integrate characters into the storyline | |
| and with the Dialogue Specialist to ensure character-consistent dialogues.""" | |
| ) | |
| dialogue_specialist = ChainlitAssistantAgent( | |
| name="Dialogue_Specialist", llm_config=llm_config, | |
| system_message="""Dialogue Specialist. Develop dialogues that reflect each character's personality | |
| and the overall tone of the soap opera. Collaborate with the Character Developer for character insights | |
| and with the Planner to fit dialogues into the narrative structure.""" | |
| ) | |
| planner = ChainlitAssistantAgent( | |
| name="Planner", llm_config=llm_config, | |
| system_message="""Planner. Guide the overall direction of the soap opera. Generate initial | |
| ideas and oversee the development process. Regularly consult with the Writer, Character Developer, | |
| and Dialogue Specialist to ensure consistency and alignment with the chosen theme. Provide feedback | |
| and adjustments as needed. First make 20 ideas, then narrow it down to 10 ideas, then narrow it down to 5, | |
| then to 3, then to 1. Narrow it down based on "What will be the most dramatic, emotional and entertaining idea". | |
| Everytime you narrow the idea down ask the User_Proxy to agree with the idea. Finally, when the last idea is finalized, | |
| discuss with the Writer to create a general idea of how the Soap Opera should look like.""" | |
| ) | |
| user_proxy = ChainlitUserProxyAgent( | |
| name="User_Proxy", | |
| human_input_mode="ALWAYS", | |
| llm_config=llm_config, | |
| # max_consecutive_auto_reply=3, | |
| # is_termination_msg=lambda x: x.get("content", "").rstrip().endswith("TERMINATE"), | |
| code_execution_config=False, | |
| system_message="""Manager. Administrate the agents on a plan. Communicate with the proofreader to proofread the output. | |
| Communicate with the writer to analyse the content and ask for it to give a summary. | |
| Reply CONTINUE, or the reason why the task is not solved yet.""" | |
| ) | |
| cl.user_session.set(USER_PROXY_NAME, user_proxy) | |
| cl.user_session.set(PROOF_READER, proof_reader) | |
| cl.user_session.set(WRITER, writer) | |
| cl.user_session.set(PLANNER, planner) | |
| cl.user_session.set(DIALOGUE_SPECIALIST, dialogue_specialist) | |
| cl.user_session.set(CHARACTER_DEVELOPER, character_developer) | |
| # WEB-SCRAPING LOGIC | |
| URL = None | |
| URL_option = None | |
| BASE_URL_EVERYTHING = 'https://newsapi.org/v2/everything?' | |
| BASE_URL_TOP_HEADLINES = 'https://newsapi.org/v2/top-headlines?' | |
| SORTBY = 'relevancy' # relevancy, popularity, publishedAt | |
| COUNTRY = 'gb' # Country you want to use | |
| CATEGORY = None # 'business' # Options: business, entertainment, general health, science, sports, technology | |
| API_KEY = os.getenv('NEWS_API_KEY') | |
| doc = cl.Action( name="doc", value="doc", label="Document" ) | |
| no_doc = cl.Action( name="no_doc", value="no_doc", label="NoDocument" ) | |
| idea = cl.Action( name="Idea", value="Idea", label="Idea" ) | |
| no_idea = cl.Action( name="NoIdea", value="NoIdea", label="NoIdea" ) | |
| everything = cl.Action( name="everything", value="everything", label="Everything" ) | |
| top_headlines = cl.Action( name="t3op-headlines",value="top-headlines", label="TopHeadlines") | |
| business = cl.Action( name="business", value="business", label="Business" ) | |
| entertainment = cl.Action( name="entertainment", value="entertainment", label="Entertainment" ) | |
| science = cl.Action( name="science", value="science", label="Science" ) | |
| sports = cl.Action( name="sports", value="sports", label="Sports" ) | |
| technology = cl.Action( name="technology", value="technology", label="Technology" ) | |
| doc_actions = [doc, no_doc] | |
| idea_actions = [idea, no_idea] | |
| url_actions = [everything, top_headlines] | |
| category_actions = [business, entertainment, science, sports, technology] | |
| # Hi, let’s generate some storyline ideas, if you have any ideas type them if not press enter | |
| IDEA_option = cl.AskActionMessage( | |
| content="Hi, let’s generate some storyline ideas. Would you like to generate ideas from Reddit, or continue?", | |
| actions=idea_actions, | |
| ) | |
| await IDEA_option.send() | |
| IDEA_option = IDEA_option.content.split()[-1] | |
| if IDEA_option == "Idea": | |
| print("Using document...") | |
| TOPIC = None | |
| while TOPIC is None: | |
| TOPIC = await cl.AskUserMessage(content="What topic would you like to make a Soap Opera about? [Only send one keyword.]", timeout=180).send() | |
| print("Topic: ", TOPIC['content']) | |
| msg = cl.Message( | |
| content=f"Processing data from Reddit...", disable_human_feedback=True | |
| ) | |
| await msg.send() | |
| articles = grab_articles(TOPIC['content']) | |
| msg = cl.Message( | |
| content=f"Content from Reddit loaded: \n{articles}", disable_human_feedback=True | |
| ) | |
| await msg.send() | |
| else: | |
| article_path = "articles.txt" | |
| articles = load_article(article_path) | |
| print("Articles grabbed.") | |
| msg = cl.Message(content=f"Processing `{articles}`...", disable_human_feedback=True, author="User_Proxy") | |
| await msg.send() | |
| cl.user_session.set(ARTICLES, articles) | |
| print("Articles set...") | |
| msg = cl.Message(content=f"""This is the Soap Opera Team, please give instructions on how the Soap Opera should be structured and made. | |
| \nSample input: | |
| "Create a captivating soap opera scene inspired by the content of the articles. Feature characters who are entangled in a | |
| complex web of emotions, ambitions, and conflicts. Craft dialogue and actions that convey the essence of the articles, infusing drama, | |
| suspense, and emotion. Leave the audience eagerly anticipating the next twist in this gripping narrative." | |
| """, | |
| disable_human_feedback=True, | |
| author="User_Proxy") | |
| await msg.send() | |
| except Exception as e: | |
| print("Error: ", e) | |
| pass | |
| async def run_conversation(message: cl.Message): | |
| #try: | |
| TASK = message.content | |
| print("Task: ", TASK) | |
| proof_reader = cl.user_session.get(PROOF_READER) | |
| user_proxy = cl.user_session.get(USER_PROXY_NAME) | |
| writer = cl.user_session.get(WRITER) | |
| writer2 = cl.user_session.get(WRITER2) | |
| planner = cl.user_session.get(PLANNER) | |
| articles = cl.user_session.get(ARTICLES) | |
| character_developer = cl.user_session.get(CHARACTER_DEVELOPER) | |
| dialogue_specialist = cl.user_session.get(DIALOGUE_SPECIALIST) | |
| groupchat = autogen.GroupChat(agents=[user_proxy, proof_reader, writer, character_developer, dialogue_specialist, planner], messages=[], max_round=50) | |
| manager = autogen.GroupChatManager(groupchat=groupchat) | |
| print("Initiated GC messages... \nGC messages length: ", len(groupchat.messages)) | |
| if len(groupchat.messages) == 0: | |
| message = f"""Access the output from {articles}. Create a Soap Opera based on the article details inside. | |
| Ensure to always check the length of the context to avoid hitting the context limit. First create 10 ideas, then 5, then 3, then 1. | |
| Finalize the ideas with the planner and make sure to follow the criteria of choosing based on: "What will be the most dramatic, | |
| emotional and entertaining idea" Do not express gratitude in responses.\n Instructions for creating Soap Opera:""" + TASK | |
| await cl.Message(content=f"""Starting agents on task of creating a Soap Opera...""").send() | |
| await cl.make_async(user_proxy.initiate_chat)( manager, message=message, ) | |
| else: | |
| await cl.make_async(user_proxy.send)( manager, message=TASK, ) | |
| # except Exception as e: | |
| # print("Error: ", e) | |
| # pass |