Ronio Jerico Roque
refactor: remove unused imports and clean up code in TargetMarketAnalyst class
a4e8282
import streamlit as st
import requests
from dotenv import load_dotenv
import time
from helper.upload_File import uploadFile
import json
import requests
class TargetMarketAnalyst:
def __init__(self, model_url, analyst_name, data_src, analyst_description):
self.model_url = model_url
self.analyst_name = analyst_name
self.data_src = data_src
self.analyst_description = analyst_description
self.initialize()
self.row1()
def initialize(self):
# FOR ENV
load_dotenv()
# AGENT NAME
st.header(self.analyst_name)
# EVALUATION FORM LINK
#url = os.getenv('Link')
#st.write('Evaluation Form: [Link](%s)' % url)
def request_model(self, payload_txt):
response = requests.post(self.model_url, json=payload_txt)
response.raise_for_status()
output = response.json()
sources = self.newsapi.get_sources()
response.raise_for_status()
output = response.json()
text = output["outputs"][0]["outputs"][0]["results"]["text"]["data"]["text"]
text = json.loads(text)
text = text[0]
target_market = text["target_market"]
demographics = text["demographics"]
summary = text["summary"]
with st.expander("News Available", expanded=True, icon="🤖"):
st.write(f"**Target Market**:\n {target_market}\n")
st.write(f"\n**Product / Service Demographics**: {demographics}")
st.write(f"\n**Marketing Message Summary**: {summary}")
return output
def row1(self):
prompt = st.chat_input("How can I help you today?")
#client = genai.Client(api_key="AIzaSyArNeCctdKaWhxprHTnHyISCd0CpKqB5dk")
payload_txt = {
"input_value": f"{prompt}",
"output_type": "chat",
"input_type": "chat",
"tweaks": {
"Agent-jDo0M": {},
"ChatInput-TcV5B": {},
"ChatOutput-woYKj": {},
"URL-iTqUH": {},
"CalculatorComponent-yPwgW": {},
"APIRequest-rDfwC": {},
"TextInput-sID7m": {},
"ParseData-ezi1L": {}
}
}
headers = {
'Content-Type': 'application/json',
}
url = "http://172.17.21.23:7860/api/v1/run/382ac239-3231-4f9b-89fe-f5ee26e4b1eb?stream=false"
if "messages" not in st.session_state:
st.session_state.messages = []
for message in st.session_state.messages:
with st.chat_message(message["role"]):
st.markdown(message["content"])
if prompt:
st.chat_message("user").markdown(prompt)
# Add user message to chat history
st.session_state.messages.append({"role": "user", "content": prompt})
response = requests.post(url, json=payload_txt, headers=headers, params={'stream': 'false'})
response.raise_for_status()
output = response.json()
text = output["outputs"][0]["outputs"][0]["results"]["message"]["text"]
def remove_escape_sequences(text):
return text.replace('\\n', '\n').replace('\\t', '\t').replace('\\r', '\r')
self.text = remove_escape_sequences(json.dumps(text, ensure_ascii=False).strip('"'))
response = f"Echo: {self.text}"
with st.chat_message("assistant"):
response = st.write_stream(self.stream_string(self.text))
# Add assistant response to chat history
st.session_state.messages.append({"role": "assistant", "content": response})
def stream_string(self, text, delay=0.003):
for char in text:
yield char
time.sleep(delay) # Small delay to simulate typing
if __name__ == "__main__":
st.set_page_config(layout="wide")
upload = uploadFile()