soap-opera / app.py
antoineross's picture
increasing the timeout
5c0e29d
"""
This is the source code for the webscraper agents that use ChainLit
Features:
- Uses top N google search based on a keyword, create a JSON file, uploads it to Google Cloud Object Storage or Locally
- Continuous messaging
- Multithreading
Written by: Antoine Ross - October 2023.
"""
import os
from typing import Dict, Optional, Union
from dotenv import load_dotenv, find_dotenv
import chainlit as cl
from chainlit.client.base import ConversationDict
from chainlit.types import AskFileResponse
from langchain.document_loaders import PyPDFLoader, TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.chains import ConversationalRetrievalChain
import autogen
from autogen import Agent, AssistantAgent, UserProxyAgent, config_list_from_json
from webscrapereddit import grab_articles
load_dotenv(find_dotenv())
WELCOME_MESSAGE = f"""Soap Opera Team 👾
\n\n
Let's make some soap opera! What topic do you want to search?
"""
# Agents
USER_PROXY_NAME = "Query Agent"
PROOF_READER = "Proofreader"
WRITER = "Writer"
WRITER2 = "Writer2"
PLANNER = "Planner"
CHARACTER_DEVELOPER = "Character Developer"
DIALOGUE_SPECIALIST = "Dialogue Specialist"
ARTICLES = None
text_splitter = RecursiveCharacterTextSplitter(chunk_size=8192, chunk_overlap=100)
def load_article(file_path):
try:
with open(file_path, 'r') as file:
article = file.read()
return article
except FileNotFoundError:
print("File not found")
return None
# Function to process the file
def process_file(file: AskFileResponse):
import tempfile
if file.type == "text/plain":
Loader = TextLoader
elif file.type == "application/pdf":
Loader = PyPDFLoader
with tempfile.NamedTemporaryFile(mode="wb", delete=False) as tempfile:
if file.type == "text/plain":
tempfile.write(file.content)
elif file.type == "application/pdf":
with open(tempfile.name, "wb") as f:
f.write(file.content)
loader = Loader(tempfile.name)
documents = loader.load()
docs = text_splitter.split_documents(documents)
for i, doc in enumerate(docs):
doc.metadata["source"] = f"source_{i}"
cl.user_session.set("docs", docs)
return docs
async def ask_helper(func, **kwargs):
res = await func(**kwargs).send()
while not res:
res = await func(**kwargs).send()
return res
class ChainlitAssistantAgent(AssistantAgent):
"""
Wrapper for AutoGens Assistant Agent
"""
def send(
self,
message: Union[Dict, str],
recipient: Agent,
request_reply: Optional[bool] = None,
silent: Optional[bool] = False,
) -> bool:
cl.run_sync(
cl.Message(
content=f'*Sending message to "{recipient.name}":*\n\n{message}',
author=self.name,
).send()
)
super(ChainlitAssistantAgent, self).send(
message=message,
recipient=recipient,
request_reply=request_reply,
silent=silent,
)
class ChainlitUserProxyAgent(UserProxyAgent):
"""
Wrapper for AutoGens UserProxy Agent. Simplifies the UI by adding CL Actions.
"""
def get_human_input(self, prompt: str) -> str:
if prompt.startswith(
"Provide feedback to chat_manager. Press enter to skip and use auto-reply"
):
res = cl.run_sync(
ask_helper(
cl.AskActionMessage,
content="Continue or provide feedback?",
actions=[
cl.Action( name="continue", value="continue", label="✅ Continue" ),
cl.Action( name="feedback",value="feedback", label="💬 Provide feedback"),
cl.Action( name="exit",value="exit", label="🔚 Exit Conversation" )
],
)
)
if res.get("value") == "continue":
return ""
if res.get("value") == "exit":
return "exit"
reply = cl.run_sync(ask_helper(cl.AskUserMessage, content=prompt, timeout=120))
return reply["content"].strip()
def send(
self,
message: Union[Dict, str],
recipient: Agent,
request_reply: Optional[bool] = None,
silent: Optional[bool] = False,
):
cl.run_sync(
cl.Message(
content=f'*Sending message to "{recipient.name}"*:\n\n{message}',
author=self.name,
).send()
)
super(ChainlitUserProxyAgent, self).send(
message=message,
recipient=recipient,
request_reply=request_reply,
silent=silent,
)
# @cl.oauth_callback
# def oauth_callback(
# provider_id: str,
# token: str,
# raw_user_data: Dict[str, str],
# default_app_user: cl.AppUser,
# ) -> Optional[cl.AppUser]:
# return default_app_user
# @cl.on_chat_resume
# async def on_chat_resume(conversation: ConversationDict):
# # Access the root messages
# root_messages = [m for m in conversation["messages"] if m["parentId"] == None]
# # Access the user_session
# cl.user_session.get("chat_profile")
# # Or just pass if you do not need to run any custom logic
# pass
config_list = autogen.config_list_from_dotenv(
dotenv_file_path='.env',
model_api_key_map={
"gpt-4-1106-preview": "OPENAI_API_KEY",
},
filter_dict={
"model": {
"gpt-4-1106-preview",
}
}
)
@cl.action_callback("confirm_action")
async def on_action(action: cl.Action):
if action.value == "everything":
content = "everything"
elif action.value == "top-headlines":
content = "top_headlines"
else:
await cl.ErrorMessage(content="Invalid action").send()
return
prev_msg = cl.user_session.get("url_actions") # type: cl.Message
if prev_msg:
await prev_msg.remove_actions()
cl.user_session.set("url_actions", None)
await cl.Message(content=content).send()
@cl.on_chat_start
async def on_chat_start():
OPENAI_API_KEY = os.getenv('OPENAI_API_KEY')
try:
# app_user = cl.user_session.get("user")
# await cl.Message(f"Hello {app_user.username}").send()
# config_list = config_list_from_json(env_or_file="OAI_CONFIG_LIST")
llm_config = {"config_list": config_list, "api_key": OPENAI_API_KEY, "seed": 42, "request_timeout": 180, "retry_wait_time": 60}
proof_reader = ChainlitAssistantAgent(
name="Proof_Reader", llm_config=llm_config,
system_message="""Proof_Reader. Review and refine the content produced by the Writer and
Dialogue Specialist for grammar, style, and coherence. Provide feedback directly to them
and collaborate with the Planner to ensure the content aligns with the overall plan."""
)
writer = ChainlitAssistantAgent(
name="Writer", llm_config=llm_config,
system_message="""Develop the main storyline and setting for the soap opera. Collaborate
with the Character Developer for character integration and the Planner for aligning your
ideas with the overall concept. Share your drafts with the Dialogue Specialist for dialogue
development and the Proof_Reader for quality checks. Write down the final output with headings and subheadings."""
)
character_developer = ChainlitAssistantAgent(
name="Character_Developer", llm_config=llm_config,
system_message="""Character Developer. Design comprehensive character profiles, including backstories,
motivations, and relationships. Advise the Planner to integrate characters into the storyline
and with the Dialogue Specialist to ensure character-consistent dialogues."""
)
dialogue_specialist = ChainlitAssistantAgent(
name="Dialogue_Specialist", llm_config=llm_config,
system_message="""Dialogue Specialist. Develop dialogues that reflect each character's personality
and the overall tone of the soap opera. Collaborate with the Character Developer for character insights
and with the Planner to fit dialogues into the narrative structure."""
)
planner = ChainlitAssistantAgent(
name="Planner", llm_config=llm_config,
system_message="""Planner. Guide the overall direction of the soap opera. Generate initial
ideas and oversee the development process. Regularly consult with the Writer, Character Developer,
and Dialogue Specialist to ensure consistency and alignment with the chosen theme. Provide feedback
and adjustments as needed. First make 20 ideas, then narrow it down to 10 ideas, then narrow it down to 5,
then to 3, then to 1. Narrow it down based on "What will be the most dramatic, emotional and entertaining idea".
Everytime you narrow the idea down ask the User_Proxy to agree with the idea. Finally, when the last idea is finalized,
discuss with the Writer to create a general idea of how the Soap Opera should look like."""
)
user_proxy = ChainlitUserProxyAgent(
name="User_Proxy",
human_input_mode="ALWAYS",
llm_config=llm_config,
# max_consecutive_auto_reply=3,
# is_termination_msg=lambda x: x.get("content", "").rstrip().endswith("TERMINATE"),
code_execution_config=False,
system_message="""Manager. Administrate the agents on a plan. Communicate with the proofreader to proofread the output.
Communicate with the writer to analyse the content and ask for it to give a summary.
Reply CONTINUE, or the reason why the task is not solved yet."""
)
cl.user_session.set(USER_PROXY_NAME, user_proxy)
cl.user_session.set(PROOF_READER, proof_reader)
cl.user_session.set(WRITER, writer)
cl.user_session.set(PLANNER, planner)
cl.user_session.set(DIALOGUE_SPECIALIST, dialogue_specialist)
cl.user_session.set(CHARACTER_DEVELOPER, character_developer)
# WEB-SCRAPING LOGIC
URL = None
URL_option = None
BASE_URL_EVERYTHING = 'https://newsapi.org/v2/everything?'
BASE_URL_TOP_HEADLINES = 'https://newsapi.org/v2/top-headlines?'
SORTBY = 'relevancy' # relevancy, popularity, publishedAt
COUNTRY = 'gb' # Country you want to use
CATEGORY = None # 'business' # Options: business, entertainment, general health, science, sports, technology
API_KEY = os.getenv('NEWS_API_KEY')
doc = cl.Action( name="doc", value="doc", label="Document" )
no_doc = cl.Action( name="no_doc", value="no_doc", label="NoDocument" )
idea = cl.Action( name="Idea", value="Idea", label="Idea" )
no_idea = cl.Action( name="NoIdea", value="NoIdea", label="NoIdea" )
everything = cl.Action( name="everything", value="everything", label="Everything" )
top_headlines = cl.Action( name="t3op-headlines",value="top-headlines", label="TopHeadlines")
business = cl.Action( name="business", value="business", label="Business" )
entertainment = cl.Action( name="entertainment", value="entertainment", label="Entertainment" )
science = cl.Action( name="science", value="science", label="Science" )
sports = cl.Action( name="sports", value="sports", label="Sports" )
technology = cl.Action( name="technology", value="technology", label="Technology" )
doc_actions = [doc, no_doc]
idea_actions = [idea, no_idea]
url_actions = [everything, top_headlines]
category_actions = [business, entertainment, science, sports, technology]
# Hi, let’s generate some storyline ideas, if you have any ideas type them if not press enter
IDEA_option = cl.AskActionMessage(
content="Hi, let’s generate some storyline ideas. Would you like to generate ideas from Reddit, or continue?",
actions=idea_actions,
)
await IDEA_option.send()
IDEA_option = IDEA_option.content.split()[-1]
if IDEA_option == "Idea":
print("Using document...")
TOPIC = None
while TOPIC is None:
TOPIC = await cl.AskUserMessage(content="What topic would you like to make a Soap Opera about? [Only send one keyword.]", timeout=180).send()
print("Topic: ", TOPIC['content'])
msg = cl.Message(
content=f"Processing data from Reddit...", disable_human_feedback=True
)
await msg.send()
articles = grab_articles(TOPIC['content'])
msg = cl.Message(
content=f"Content from Reddit loaded: \n{articles}", disable_human_feedback=True
)
await msg.send()
else:
article_path = "articles.txt"
articles = load_article(article_path)
print("Articles grabbed.")
msg = cl.Message(content=f"Processing `{articles}`...", disable_human_feedback=True, author="User_Proxy")
await msg.send()
cl.user_session.set(ARTICLES, articles)
print("Articles set...")
msg = cl.Message(content=f"""This is the Soap Opera Team, please give instructions on how the Soap Opera should be structured and made.
\nSample input:
"Create a captivating soap opera scene inspired by the content of the articles. Feature characters who are entangled in a
complex web of emotions, ambitions, and conflicts. Craft dialogue and actions that convey the essence of the articles, infusing drama,
suspense, and emotion. Leave the audience eagerly anticipating the next twist in this gripping narrative."
""",
disable_human_feedback=True,
author="User_Proxy")
await msg.send()
except Exception as e:
print("Error: ", e)
pass
@cl.on_message
async def run_conversation(message: cl.Message):
#try:
TASK = message.content
print("Task: ", TASK)
proof_reader = cl.user_session.get(PROOF_READER)
user_proxy = cl.user_session.get(USER_PROXY_NAME)
writer = cl.user_session.get(WRITER)
writer2 = cl.user_session.get(WRITER2)
planner = cl.user_session.get(PLANNER)
articles = cl.user_session.get(ARTICLES)
character_developer = cl.user_session.get(CHARACTER_DEVELOPER)
dialogue_specialist = cl.user_session.get(DIALOGUE_SPECIALIST)
groupchat = autogen.GroupChat(agents=[user_proxy, proof_reader, writer, character_developer, dialogue_specialist, planner], messages=[], max_round=50)
manager = autogen.GroupChatManager(groupchat=groupchat)
print("Initiated GC messages... \nGC messages length: ", len(groupchat.messages))
if len(groupchat.messages) == 0:
message = f"""Access the output from {articles}. Create a Soap Opera based on the article details inside.
Ensure to always check the length of the context to avoid hitting the context limit. First create 10 ideas, then 5, then 3, then 1.
Finalize the ideas with the planner and make sure to follow the criteria of choosing based on: "What will be the most dramatic,
emotional and entertaining idea" Do not express gratitude in responses.\n Instructions for creating Soap Opera:""" + TASK
await cl.Message(content=f"""Starting agents on task of creating a Soap Opera...""").send()
await cl.make_async(user_proxy.initiate_chat)( manager, message=message, )
else:
await cl.make_async(user_proxy.send)( manager, message=TASK, )
# except Exception as e:
# print("Error: ", e)
# pass