File size: 4,811 Bytes
fdbec52
90263a4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
45e0afe
 
 
 
90263a4
 
 
 
 
45e0afe
 
 
 
 
 
 
 
 
 
 
90263a4
45e0afe
 
 
90263a4
45e0afe
 
 
90263a4
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
from config import *

@st.cache_data()
def make_request(user_input:str, 
                 short_history:list, 
                 chat_history:list):

    """Makes a request to the Hugging Face API"""

    client = InferenceClient(
        MODEL_PATH,
        token=HUGGING_FACE_API_KEY,
        )
    
    try:
        response = client.chat_completion(
                messages=short_history,
                max_tokens = 5000,
                stream = False,
                )
        
        # get the response
        message = response.choices[0].message['content']

        # analyse the content to see if there is an action to perform
        try:
            perform_actions = look_for_actions(user_input, message)

        except Exception as e:
            st.info(f"An error occurred while looking for actions: {e}")
            perform_actions = (False, None)

        # if there was an action to perform, resubmit the question to the chatbot:
        if perform_actions[0]:
            
            # replace the last message in the short history with the new message
            short_history[-1] = {'role':'user', 'content':perform_actions[1]}

            # replace the first message with the system prompt without url analysis
            short_history[0] = {'role':'system', 'content':SYSTEM_PROMPT_NO_URL}

            # wait a little bit to avoid the API limit
            time.sleep(1)

            # make the request again
            response = client.chat_completion(
                messages=short_history,
                max_tokens = 5000,
                stream = False,
                )
                
        # append to the history
        chat_history.append({'content':user_input, 'role':'user'})
        chat_history.append(response.choices[0].message) # append the response

        return chat_history

    except Exception as e:
        st.error(f"An error occurred: {e}")
        st.stop()

@st.cache_data()
def get_site_content(url:str):

    """Receives a URL and returns the content of the site"""

    # create an user agent
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3'
    }

    # get the site content
    response = requests.get(url, headers=headers)
    soup = BeautifulSoup(response.text, 'html.parser')

    # remove styles and scripts
    for script in soup(["script", "style"]):
        script.extract()

    # let the meta descriptions of the header and all the content inside the body
    # for the meta tags, get the tag itself and its content
    meta_tags = soup.head.find_all('meta')
    meta_tags_text = ''
    for tag in meta_tags:
        meta_tags_text += f'<{tag.name} {tag.attrs}>\n'

    # get the body text
    body_text = soup.body.get_text()

    # join the meta tags and the body text
    text = f'{meta_tags_text}\n{body_text}'

    # remove empty lines
    text = os.linesep.join([s for s in text.splitlines() if s])

    return text

def look_for_actions(user_input:str, message:str):

    """Reveives a message and look for the pattern ###ACTION###function###URL###"""

    # check if the pattern is in the message. 
    if '###' in message:

        # split the message by the pattern ###ACTION###function###URL### to get the URL and the action
        split_string = message.split('###')

        if 'getSiteContent' in message:

            st.info("I need to visit the site to provide the answer. Please wait...")

            url = split_string[3].strip()

            # remove everything inside ### and ### (including the ###) from the user_input
            user_input = re.sub(r'###.*?###', '', user_input)

            # add the content of the website to the message
            url_content = f'{user_input}. Content of the site {url}:\n{get_site_content(url)}'

            # check if the url_content is too long. If soo, keep trimming the text until it is not too long
            while get_token_amount(url_content) > 5000:
                url_content = url_content[:-100]
            
            return (True, url_content)

    # if there is no action to perform, return None   
    return (False, None)

@st.cache_data(ttl=3600)
def get_token_amount(text,
                     model_name="gpt-4") -> int:
    
    """Uses the tiktoken library to check if a text is too long for a given model.
    Even tough we are using a Llama model, we are using the GPT-4 model ans an approximation.
    
    Args:
        text (str): The text to check.
        model_name (str): The name of the model to check. Defaults to "gpt-4".
        
    Returns:
        int: The number of tokens in the text.
    """

    encoding = tiktoken.encoding_for_model(model_name)
    tokens = encoding.encode(text)
    return len(tokens)