Spaces:
Sleeping
Sleeping
| import openai | |
| import time | |
| from datetime import datetime | |
| import requests | |
| import json | |
| import streamlit as st | |
| news_api_key = st.secrets["news_api_key"] | |
| openai_api_key = st.secrets["openai_api_key"] | |
| client = openai.OpenAI(api_key = openai_api_key) | |
| model = "gpt-3.5-turbo-16k" | |
| def get_news(topic): | |
| url = ( | |
| f"https://newsapi.org/v2/everything?q={topic}&apiKey={news_api_key}&pageSize=5" | |
| ) | |
| try: | |
| response = requests.get(url) | |
| if response.status_code == 200: | |
| news = json.dumps(response.json(), indent=4) | |
| news_json = json.loads(news) | |
| data = news_json | |
| # Access all the fiels == loop through | |
| status = data["status"] | |
| total_results = data["totalResults"] | |
| articles = data["articles"] | |
| final_news = [] | |
| # Loop through articles | |
| for article in articles: | |
| source_name = article["source"]["name"] | |
| author = article["author"] | |
| title = article["title"] | |
| description = article["description"] | |
| url = article["url"] | |
| content = article["content"] | |
| title_description = f""" | |
| Title: {title}, | |
| Author: {author}, | |
| Source: {source_name}, | |
| Description: {description}, | |
| URL: {url} | |
| """ | |
| final_news.append(title_description) | |
| return final_news | |
| else: | |
| return [] | |
| except requests.exceptions.RequestException as e: | |
| print("Error occured during API Request", e) | |
| class AssistantManager (): | |
| thread_id = None | |
| assistant_id = None | |
| def __init__(self, model: str = model, assistant_id = None, thread_id = None): | |
| self.client = client | |
| self.model = model | |
| self.run = None | |
| self.summary = None | |
| self.assistant_id = assistant_id | |
| self.thread = thread_id | |
| # Retrieve existing thread if IDs are already set | |
| if self.thread_id: | |
| self.thread = self.client.beta.threads.retrieve( | |
| thread_id=self.thread_id | |
| ) | |
| def create_assistant(self, name, instructions, tools): | |
| # Retrieve existing assistant and thread if IDs are already set | |
| try: | |
| self.assistant = self.client.beta.assistants.retrieve( | |
| assistant_id=self.assistant_id | |
| ) | |
| except openai.NotFoundError: | |
| # If the assistant doesn't exist, create a new one | |
| assistant_obj = self.client.beta.assistants.create( | |
| name=name, instructions=instructions, tools=tools, model=self.model | |
| ) | |
| self.assistant_id = assistant_obj.id | |
| self.assistant = assistant_obj | |
| print(f"Created new assistant with ID: {self.assistant.id}") | |
| except Exception as e: | |
| # If any other error occurs, raise the error | |
| raise e | |
| def create_thread(self): | |
| if not self.thread: | |
| thread_obj = self.client.beta.threads.create() | |
| self.thread_id = thread_obj.id | |
| self.thread = thread_obj | |
| print(f"ThreadID::: {self.thread.id}") | |
| def add_message_to_thread(self, role, content): | |
| if self.thread: | |
| self.client.beta.threads.messages.create( | |
| thread_id=self.thread.id, role=role, content=content | |
| ) | |
| def run_assistant(self, instructions): | |
| if self.thread and self.assistant: | |
| self.run = self.client.beta.threads.runs.create( | |
| thread_id=self.thread.id, | |
| assistant_id=self.assistant.id, | |
| instructions=instructions, | |
| ) | |
| def process_message(self): | |
| if self.thread: | |
| messages = self.client.beta.threads.messages.list(thread_id=self.thread.id) | |
| summary = [] | |
| last_message = messages.data[0] | |
| role = last_message.role | |
| response = last_message.content[0].text.value | |
| summary.append(response) | |
| self.summary = "\n".join(summary) | |
| print(f"SUMMARY-----> {role.capitalize()}: ==> {response}") | |
| def call_required_functions(self, required_actions): | |
| if not self.run: | |
| return | |
| tool_outputs = [] | |
| for action in required_actions["tool_calls"]: | |
| func_name = action["function"]["name"] | |
| arguments = json.loads(action["function"]["arguments"]) | |
| if func_name == "get_news": | |
| output = get_news(topic=arguments["topic"]) | |
| print(f"STUFFFFF;;;;{output}") | |
| final_str = "" | |
| for item in output: | |
| final_str += "".join(item) | |
| tool_outputs.append({"tool_call_id": action["id"], "output": final_str}) | |
| else: | |
| raise ValueError(f"Unknown function: {func_name}") | |
| print("Submitting outputs back to the Assistant...") | |
| self.client.beta.threads.runs.submit_tool_outputs( | |
| thread_id=self.thread.id, run_id=self.run.id, tool_outputs=tool_outputs | |
| ) | |
| # for streamlit | |
| def get_summary(self): | |
| return self.summary | |
| def wait_for_completion(self): | |
| if self.thread and self.run: | |
| while True: | |
| time.sleep(5) | |
| run_status = self.client.beta.threads.runs.retrieve( | |
| thread_id=self.thread.id, run_id=self.run.id | |
| ) | |
| print(f"RUN STATUS:: {run_status.model_dump_json(indent=4)}") | |
| if run_status.status == "completed": | |
| self.process_message() | |
| break | |
| elif run_status.status == "requires_action": | |
| print("FUNCTION CALLING NOW...") | |
| self.call_required_functions( | |
| required_actions=run_status.required_action.submit_tool_outputs.model_dump() | |
| ) | |
| # Run the steps | |
| def run_steps(self): | |
| run_steps = self.client.beta.threads.runs.steps.list( | |
| thread_id=self.thread.id, run_id=self.run.id | |
| ) | |
| print(f"Run-Steps::: {run_steps}") | |
| return run_steps.data | |
| def main(): | |
| manager = AssistantManager(assistant_id = 'asst_7adqkvmsU2ovkQsPiBFbh5yV') | |
| # asst_lGnxol2MqmLr4x6OA4xJYXAq me | |
| # asst_7adqkvmsU2ovkQsPiBFbh5yV Charles | |
| # Streamlit interface | |
| st.title("News Summariser") | |
| # Note that API key's running out of budget | |
| contact_url = "https://www.linkedin.com/in/linhvuu" | |
| st.write("If no result returns, it means I am running out of energy. Please contact [Linh Vuu](%s) to wake me up." % contact_url) | |
| with st.form(key="user_input_form"): | |
| instructions = st.text_input("Enter topic:") | |
| submit_button = st.form_submit_button(label="Find and summarise news related to the topic above") | |
| if submit_button: | |
| manager.create_assistant( | |
| name="News Summariser", | |
| instructions="You are a personal article summarizer Assistant who knows how to take a list of article's titles and descriptions and then write a short summary of all the news articles", | |
| tools=[ | |
| { | |
| "type": "function", | |
| "function": { | |
| "name": "get_news", | |
| "description": "Get the list of articles/news for the given topic", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "topic": { | |
| "type": "string", | |
| "description": "The topic for the news, e.g. bitcoin", | |
| } | |
| }, | |
| "required": ["topic"], | |
| }, | |
| }, | |
| } | |
| ], | |
| ) | |
| manager.create_thread() | |
| # Add the message and run the assistant | |
| manager.add_message_to_thread( | |
| role="user", content=f"summarize the news on this topic {instructions}?" | |
| ) | |
| manager.run_assistant(instructions="Summarize the news") | |
| # Wait for completions and process messages | |
| manager.wait_for_completion() | |
| summary = manager.get_summary() | |
| st.write(summary) | |
| if __name__ == "__main__": | |
| main() |