|
|
from dotenv import load_dotenv
|
|
|
import os
|
|
|
import requests
|
|
|
import gradio as gr
|
|
|
from pypdf import PdfReader
|
|
|
import google.generativeai as genai
|
|
|
from chromadb import Documents, EmbeddingFunction, Embeddings
|
|
|
from typing import Dict, List
|
|
|
import numpy as np
|
|
|
from sklearn.metrics.pairwise import cosine_similarity
|
|
|
import re
|
|
|
import pickle
|
|
|
import json
|
|
|
from embed import *
|
|
|
load_dotenv(override=True)
|
|
|
genai.configure(api_key=os.getenv("GEMINI_API"))
|
|
|
pushover_user = os.getenv("PUSHOVER_USER")
|
|
|
pushover_token = os.getenv("PUSHOVER_API")
|
|
|
pushover_url = f"https://api.pushover.net/1/messages.json"
|
|
|
|
|
|
|
|
|
def push(message: str):
|
|
|
print("Pushing to Pushover ", message)
|
|
|
payload = {"user": pushover_user, "token": pushover_token, "message": message}
|
|
|
requests.post(pushover_url, data=payload)
|
|
|
|
|
|
def record_user_details(email: str,
|
|
|
name: str,
|
|
|
notes: str) -> Dict[str, str]:
|
|
|
push(f"Email: {email}\nName: {name}\nNotes: {notes}")
|
|
|
return {"recorded": "ok"}
|
|
|
|
|
|
|
|
|
def record_unknown_question(question: str) -> Dict[str, str]:
|
|
|
push(f"Question: {question}")
|
|
|
return {"recorded": "ok"}
|
|
|
|
|
|
|
|
|
def handle_tool_calls(tool_calls: List) -> List[Dict[str, str]]:
|
|
|
results = []
|
|
|
for tool_call in tool_calls:
|
|
|
tool_name = tool_call.name
|
|
|
arguments = dict(tool_call.args)
|
|
|
print(f"Tool called: {tool_name} with arguments: {arguments}")
|
|
|
tool = globals().get(tool_name)
|
|
|
result = tool(**arguments) if tool else {}
|
|
|
|
|
|
results.append({
|
|
|
"function_response": {
|
|
|
"name": tool_name,
|
|
|
"response": result
|
|
|
}
|
|
|
})
|
|
|
return results
|
|
|
|
|
|
record_user_details_json = {
|
|
|
"name": "record_user_details",
|
|
|
"description": "Use this tool to record that a user is interested in being in touch and provided an email address",
|
|
|
"parameters": {
|
|
|
"type": "OBJECT",
|
|
|
"properties": {
|
|
|
"email": {
|
|
|
"type": "STRING",
|
|
|
"description": "The email address of this user"
|
|
|
},
|
|
|
"name": {
|
|
|
"type": "STRING",
|
|
|
"description": "The user's name, if they provided it"
|
|
|
}
|
|
|
,
|
|
|
"notes": {
|
|
|
"type": "STRING",
|
|
|
"description": "Any additional information about the conversation that's worth recording to give context"
|
|
|
}
|
|
|
},
|
|
|
"required": ["name", "email"]
|
|
|
}
|
|
|
}
|
|
|
|
|
|
record_unknown_question_json = {
|
|
|
"name": "record_unknown_question",
|
|
|
"description": "Always use this tool to record any question that couldn't be answered as you didn't know the answer",
|
|
|
"parameters": {
|
|
|
"type": "OBJECT",
|
|
|
"properties": {
|
|
|
"question": {
|
|
|
"type": "STRING",
|
|
|
"description": "The question that couldn't be answered"
|
|
|
},
|
|
|
},
|
|
|
"required": ["question"]
|
|
|
}
|
|
|
}
|
|
|
|
|
|
tools = [
|
|
|
record_user_details_json,
|
|
|
record_unknown_question_json
|
|
|
]
|
|
|
|
|
|
|
|
|
|
|
|
class App:
|
|
|
|
|
|
def __init__(self):
|
|
|
self.db = load_chroma_db(path="Week_1/Data_w1", name='RAG_DB')
|
|
|
|
|
|
|
|
|
def rag_prompt(self, query: str, relevant_passages: str) -> str:
|
|
|
escaped = relevant_passages.replace("'", "").replace('"', "").replace("\n", " ")
|
|
|
prompt = f'''
|
|
|
Please answer questions using text from the reference passage included below. \
|
|
|
Be sure to respond in a complete sentence, being comprehensive, including all relevant background information. \
|
|
|
However, you are talking to a non-technical audience, so be sure to break down complicated concepts and \
|
|
|
strike a friendly and converstional tone. \
|
|
|
If the passage is irrelevant to the question, you should respond with "I do not have an answer for that." and use record_unknown_question tool to record the question. \
|
|
|
QUESTION: {query} \
|
|
|
PASSAGE: {escaped}
|
|
|
'''
|
|
|
return prompt
|
|
|
|
|
|
def system_prompt(self) -> str:
|
|
|
return '''
|
|
|
You are acting as Ed Donner. You are answering questions on Ed Donner's website, \
|
|
|
particularly questions related to Ed Donner's career, background, skills and experience. \
|
|
|
Your responsibility is to represent Ed Donner for interactions on the website as faithfully as possible. \
|
|
|
Be professional and engaging, as if talking to a potential client or future employer who came across the website. \
|
|
|
If you don't know the answer to any question, use your record_unknown_question tool to record the question that you couldn't answer, even if it's about something trivial or unrelated to career. \
|
|
|
If the user is engaging in discussion, try to steer them towards getting in touch via email; ask for their email and record it using your record_user_details tool.
|
|
|
'''
|
|
|
def chat_with_gemini(self, message, history, system_prompt):
|
|
|
try:
|
|
|
|
|
|
|
|
|
model = genai.GenerativeModel(
|
|
|
'gemini-2.0-flash',
|
|
|
system_instruction=system_prompt,
|
|
|
tools=tools
|
|
|
)
|
|
|
|
|
|
gemini_history = []
|
|
|
max_iteration = 3
|
|
|
iteration = 0
|
|
|
for msg in history:
|
|
|
if msg["role"] == "user":
|
|
|
gemini_history.append({
|
|
|
"role": "user",
|
|
|
"parts": [msg["content"]]
|
|
|
})
|
|
|
elif msg["role"] == "assistant":
|
|
|
gemini_history.append({
|
|
|
"role": "model",
|
|
|
"parts": [msg["content"]]
|
|
|
})
|
|
|
|
|
|
|
|
|
chat_session = model.start_chat(history=gemini_history)
|
|
|
relevant_passage = get_relevant_passage(query= message,
|
|
|
db= self.db,
|
|
|
n_results=3)
|
|
|
|
|
|
prompt = self.rag_prompt(query= current_message,
|
|
|
relevant_passages= " ".join(relevant_passage))
|
|
|
|
|
|
current_message = prompt
|
|
|
|
|
|
try:
|
|
|
while iteration < max_iteration:
|
|
|
|
|
|
response = chat_session.send_message(current_message)
|
|
|
|
|
|
finish_reason = response.candidates[0].finish_reason
|
|
|
|
|
|
print(f"Response parts: {[part for part in response.candidates[0].content.parts]}")
|
|
|
|
|
|
function_calls = []
|
|
|
text_parts = []
|
|
|
|
|
|
|
|
|
for part in response.candidates[0].content.parts:
|
|
|
if hasattr(part, "function_call") and part.function_call:
|
|
|
function_calls.append(part.function_call)
|
|
|
print("Function calls list not empty")
|
|
|
elif hasattr(part, "text"):
|
|
|
text_parts.append(part.text)
|
|
|
|
|
|
|
|
|
if function_calls:
|
|
|
results = handle_tool_calls(function_calls)
|
|
|
|
|
|
current_message = results
|
|
|
iteration += 1
|
|
|
else:
|
|
|
if text_parts:
|
|
|
return "".join(text_parts)
|
|
|
else:
|
|
|
return response.text
|
|
|
return ""
|
|
|
except Exception as e:
|
|
|
return f"Error: {e}"
|
|
|
except Exception as e:
|
|
|
return f"Error: {e}"
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
chat_grad = App()
|
|
|
with gr.Blocks() as demo:
|
|
|
gr.Markdown("# Chat with Google Gemini")
|
|
|
|
|
|
system_prompt = gr.Textbox(
|
|
|
value=chat_grad.system_prompt(),
|
|
|
label="System Prompt",
|
|
|
placeholder="Enter system instructions for the AI...",
|
|
|
lines=2
|
|
|
)
|
|
|
|
|
|
chat_interface = gr.ChatInterface(
|
|
|
fn=chat_grad.chat_with_gemini,
|
|
|
additional_inputs=[system_prompt],
|
|
|
title="",
|
|
|
cache_examples=False,
|
|
|
type='messages'
|
|
|
|
|
|
)
|
|
|
demo.launch()
|
|
|
|