Ayush_Alter_Ego / app.py
Ayush239's picture
Update app.py
f1dfb77 verified
raw
history blame
4.98 kB
import os
import json
import requests
from pypdf import PdfReader
from openai import OpenAI
import gradio as gr
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
BASE_URL = "https://integrate.api.nvidia.com/v1"
MODEL = "meta/llama3-8b-instruct"
PUSHOVER_TOKEN = os.environ.get("PUSHOVER_TOKEN")
PUSHOVER_USER = os.environ.get("PUSHOVER_USER")
client = OpenAI(api_key=OPENAI_API_KEY, base_url=BASE_URL)
def push(text):
try:
if not PUSHOVER_TOKEN or not PUSHOVER_USER:
print("Pushover not configured:", text)
return
requests.post(
"https://api.pushover.net/1/messages.json",
data={
"token": PUSHOVER_TOKEN,
"user": PUSHOVER_USER,
"message": text,
},
timeout=10
)
except Exception as e:
print("Pushover failed:", e)
def record_user_details(email, name="Name not provided", notes="not provided"):
push(f"Lead → {name} | {email} | {notes}")
return {"status": "ok", "email": email, "name": name, "notes": notes}
def record_unknown_question(question):
push(f"Unknown → {question}")
return {"status": "ok", "question": question}
globals()["record_user_details"] = record_user_details
globals()["record_unknown_question"] = record_unknown_question
tools = [
{
"type": "function",
"function": {
"name": "record_user_details",
"description": "Record user's interest and email.",
"parameters": {
"type": "object",
"properties": {
"email": {"type": "string"},
"name": {"type": "string"},
"notes": {"type": "string"},
},
"required": ["email"]
}
}
},
{
"type": "function",
"function": {
"name": "record_unknown_question",
"description": "Record an unknown question.",
"parameters": {
"type": "object",
"properties": {
"question": {"type": "string"},
},
"required": ["question"]
}
}
}
]
class Me:
def __init__(self):
self.name = "Ayush Tyagi"
self.summary = ""
self.linkedin_text = ""
if os.path.exists("me/summary.txt"):
self.summary = open("me/summary.txt", "r", encoding="utf-8").read()
pdf_path = "me/Ayush_linkdin.pdf"
if os.path.exists(pdf_path):
text = []
reader = PdfReader(pdf_path)
for page in reader.pages:
t = page.extract_text()
if t:
text.append(t)
self.linkedin_text = "\n\n".join(text)
def system_prompt(self):
return f"""
You are acting as {self.name}. You answer questions about his background, skills and experience.
If you don't know something, use record_unknown_question.
If user is interested, ask for email and use record_user_details.
Summary:
{self.summary}
LinkedIn:
{self.linkedin_text}
"""
def chat(self, message, history):
messages = [{"role": "system", "content": self.system_prompt()}]
for item in history:
if isinstance(item, (list, tuple)) and len(item) == 2:
u, a = item
if u:
messages.append({"role": "user", "content": u})
if a:
messages.append({"role": "assistant", "content": a})
elif isinstance(item, dict) and "role" in item and "content" in item:
messages.append({"role": item["role"], "content": item["content"]})
messages.append({"role": "user", "content": message})
while True:
response = client.chat.completions.create(
model=MODEL,
messages=messages,
tools=tools,
tool_choice="auto",
max_tokens=600
)
choice = response.choices[0]
finish = choice.finish_reason
msg = choice.message
if finish == "tool_calls":
messages.append(msg.dict())
for tool_call in msg.tool_calls:
func = tool_call.function
name = func.name
args = json.loads(func.arguments)
result = globals()[name](**args)
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": json.dumps(result)
})
continue
return msg.content
me = Me()
ui = gr.ChatInterface(
fn=me.chat,
title="Ayush Tyagi — Personal Assistant",
type="messages"
)
if __name__ == "__main__":
ui.launch(server_name="0.0.0.0", server_port=int(os.environ.get("PORT", 7860)))