File size: 8,913 Bytes
7e820ed
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
from dotenv import load_dotenv
import os
import requests
import gradio as gr
from pypdf import PdfReader
import google.generativeai as genai
from chromadb import Documents, EmbeddingFunction, Embeddings
from typing import Dict, List
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
import re
import pickle
import json
from embed import *
load_dotenv(override=True)
genai.configure(api_key=os.getenv("GEMINI_API"))
pushover_user = os.getenv("PUSHOVER_USER")
pushover_token = os.getenv("PUSHOVER_API")
pushover_url = f"https://api.pushover.net/1/messages.json"


def push(message: str):
    print("Pushing to Pushover ", message)
    payload = {"user": pushover_user, "token": pushover_token, "message": message}
    requests.post(pushover_url, data=payload)

def record_user_details(email: str, 

                        name: str,

                        notes: str) -> Dict[str, str]:
    push(f"Email: {email}\nName: {name}\nNotes: {notes}")
    return {"recorded": "ok"}


def record_unknown_question(question: str) -> Dict[str, str]:
    push(f"Question: {question}")
    return {"recorded": "ok"}


def handle_tool_calls(tool_calls: List) -> List[Dict[str, str]]:
    results = []
    for tool_call in tool_calls:
        tool_name = tool_call.name
        arguments = dict(tool_call.args)
        print(f"Tool called: {tool_name} with arguments: {arguments}")
        tool = globals().get(tool_name)
        result = tool(**arguments) if tool else {}
        # Format for Gemini function response
        results.append({
            "function_response": {
                "name": tool_name,
                "response": result
            }
        })
    return results

record_user_details_json = {
    "name": "record_user_details",
    "description": "Use this tool to record that a user is interested in being in touch and provided an email address",
    "parameters": {
        "type": "OBJECT",
        "properties": {
            "email": {
                "type": "STRING",
                "description": "The email address of this user"
            },
            "name": {
                "type": "STRING",
                "description": "The user's name, if they provided it"
            }
            ,
            "notes": {
                "type": "STRING",
                "description": "Any additional information about the conversation that's worth recording to give context"
            }
        },
        "required": ["name", "email"]
    }
}

record_unknown_question_json = {
    "name": "record_unknown_question",
    "description": "Always use this tool to record any question that couldn't be answered as you didn't know the answer",
    "parameters": {
        "type": "OBJECT",
        "properties": {
            "question": {
                "type": "STRING",
                "description": "The question that couldn't be answered"
            },
        },
        "required": ["question"]
    }
}

tools = [
    record_user_details_json,
    record_unknown_question_json
]



class App:

    def __init__(self):
        self.db = load_chroma_db(path="Week_1/Data_w1", name='RAG_DB')
        

    def rag_prompt(self, query: str, relevant_passages: str) -> str:
        escaped = relevant_passages.replace("'", "").replace('"', "").replace("\n", " ")
        prompt = f'''

            Please answer questions using text from the reference passage included below. \

            Be sure to respond in a complete sentence, being comprehensive, including all relevant background information. \

            However, you are talking to a non-technical audience, so be sure to break down complicated concepts and \

            strike a friendly and converstional tone. \

            If the passage is irrelevant to the question, you should respond with "I do not have an answer for that." and use record_unknown_question tool to record the question. \

            QUESTION: {query} \

            PASSAGE: {escaped}      

        '''
        return prompt

    def system_prompt(self) -> str:
        return '''

            You are acting as Ed Donner. You are answering questions on Ed Donner's website, \

            particularly questions related to Ed Donner's career, background, skills and experience. \

            Your responsibility is to represent Ed Donner for interactions on the website as faithfully as possible. \

            Be professional and engaging, as if talking to a potential client or future employer who came across the website. \

            If you don't know the answer to any question, use your record_unknown_question tool to record the question that you couldn't answer, even if it's about something trivial or unrelated to career. \

            If the user is engaging in discussion, try to steer them towards getting in touch via email; ask for their email and record it using your record_user_details tool.

        '''
    def chat_with_gemini(self, message, history, system_prompt):
        try:
            # Load data base
            # Create the model with system instruction
            model = genai.GenerativeModel(
                'gemini-2.0-flash',
                system_instruction=system_prompt,
                tools=tools
            )   
            # Convert Gradio messages format to Gemini format
            gemini_history = []
            max_iteration = 3
            iteration = 0
            for msg in history:
                if msg["role"] == "user":
                    gemini_history.append({
                        "role": "user",
                        "parts": [msg["content"]]
                    })
                elif msg["role"] == "assistant":
                    gemini_history.append({
                        "role": "model",  
                        "parts": [msg["content"]]
                    })
            
            # Start chat with history
            chat_session = model.start_chat(history=gemini_history)
            relevant_passage = get_relevant_passage(query= message, 
                                                            db= self.db,
                                                            n_results=3)
                                
            prompt = self.rag_prompt(query= current_message, 
                                        relevant_passages= " ".join(relevant_passage))
            
            current_message = prompt

            try:
                while iteration < max_iteration:
                    # Send the current message
                    response = chat_session.send_message(current_message)
                    # Check for its finishing 
                    finish_reason = response.candidates[0].finish_reason

                    print(f"Response parts: {[part for part in response.candidates[0].content.parts]}")

                    function_calls = []
                    text_parts = []
                    
                    # If the LLM wants to call the tools
                    for part in response.candidates[0].content.parts:
                        if hasattr(part, "function_call") and part.function_call:
                            function_calls.append(part.function_call)
                            print("Function calls list not empty")
                        elif hasattr(part, "text"):
                            text_parts.append(part.text)
                    
                    # Excecute if function_calls not empty
                    if function_calls:
                        results = handle_tool_calls(function_calls)
                        # Add the result back to the model
                        current_message = results
                        iteration += 1
                    else:
                        if text_parts:
                            return "".join(text_parts)
                        else:
                            return response.text
                return ""
            except Exception as e:
                return f"Error: {e}"
        except Exception as e:
            return f"Error: {e}"
        

if __name__ == "__main__":
    chat_grad = App()
    with gr.Blocks() as demo:
        gr.Markdown("# Chat with Google Gemini")
        
        system_prompt = gr.Textbox(
            value=chat_grad.system_prompt(),
            label="System Prompt",
            placeholder="Enter system instructions for the AI...",
            lines=2
        )
        
        chat_interface = gr.ChatInterface(
            fn=chat_grad.chat_with_gemini,
            additional_inputs=[system_prompt],
            title="",
            cache_examples=False,
            type='messages'
            
        )
    demo.launch()