| from dotenv import load_dotenv |
| from openai import OpenAI |
| import json |
| import os |
| import requests |
| from pypdf import PdfReader |
| import gradio as gr |
| import base64 |
| import time |
| from collections import defaultdict |
| import fastapi |
| from gradio.context import Context |
| import logging |
|
|
| logger = logging.getLogger(__name__) |
| logger.setLevel(logging.DEBUG) |
|
|
|
|
| load_dotenv(override=True) |
|
|
| class RateLimiter: |
| def __init__(self, max_requests=5, time_window=5): |
| |
| self.max_requests = max_requests |
| self.time_window = time_window |
| self.request_history = defaultdict(list) |
| |
| def is_rate_limited(self, user_id): |
| current_time = time.time() |
| |
| self.request_history[user_id] = [ |
| timestamp for timestamp in self.request_history[user_id] |
| if current_time - timestamp < self.time_window |
| ] |
| |
| |
| if len(self.request_history[user_id]) >= self.max_requests: |
| return True |
| |
| |
| self.request_history[user_id].append(current_time) |
| return False |
|
|
| def push(text): |
| requests.post( |
| "https://api.pushover.net/1/messages.json", |
| data={ |
| "token": os.getenv("PUSHOVER_TOKEN"), |
| "user": os.getenv("PUSHOVER_USER"), |
| "message": text, |
| } |
| ) |
|
|
| def send_email(from_email, name, notes): |
| auth = base64.b64encode(f'api:{os.getenv("MAILGUN_API_KEY")}'.encode()).decode() |
| |
| response = requests.post( |
| f'https://api.mailgun.net/v3/{os.getenv("MAILGUN_DOMAIN")}/messages', |
| headers={ |
| 'Authorization': f'Basic {auth}' |
| }, |
| data={ |
| 'from': f'Website Contact <mailgun@{os.getenv("MAILGUN_DOMAIN")}>', |
| 'to': os.getenv("MAILGUN_RECIPIENT"), |
| 'subject': f'New message from {from_email}', |
| 'text': f'Name: {name}\nEmail: {from_email}\nNotes: {notes}', |
| 'h:Reply-To': from_email |
| } |
| ) |
| |
| return response.status_code == 200 |
|
|
|
|
| def record_user_details(email, name="Name not provided", notes="not provided"): |
| push(f"Recording {name} with email {email} and notes {notes}") |
| |
| email_sent = send_email(email, name, notes) |
| return {"recorded": "ok", "email_sent": email_sent} |
|
|
| def record_unknown_question(question): |
| push(f"Recording {question}") |
| return {"recorded": "ok"} |
|
|
| record_user_details_json = { |
| "name": "record_user_details", |
| "description": "Use this tool to record that a user is interested in being in touch and provided an email address", |
| "parameters": { |
| "type": "object", |
| "properties": { |
| "email": { |
| "type": "string", |
| "description": "The email address of this user" |
| }, |
| "name": { |
| "type": "string", |
| "description": "The user's name, if they provided it" |
| } |
| , |
| "notes": { |
| "type": "string", |
| "description": "Any additional information about the conversation that's worth recording to give context" |
| } |
| }, |
| "required": ["email"], |
| "additionalProperties": False |
| } |
| } |
|
|
| record_unknown_question_json = { |
| "name": "record_unknown_question", |
| "description": "Always use this tool to record any question that couldn't be answered as you didn't know the answer", |
| "parameters": { |
| "type": "object", |
| "properties": { |
| "question": { |
| "type": "string", |
| "description": "The question that couldn't be answered" |
| }, |
| }, |
| "required": ["question"], |
| "additionalProperties": False |
| } |
| } |
|
|
| tools = [{"type": "function", "function": record_user_details_json}, |
| {"type": "function", "function": record_unknown_question_json}] |
|
|
|
|
| class Me: |
|
|
| def __init__(self): |
| self.openai = OpenAI(api_key=os.getenv("GOOGLE_API_KEY"), base_url="https://generativelanguage.googleapis.com/v1beta/openai/") |
| self.name = "Sagarnil Das" |
| self.rate_limiter = RateLimiter(max_requests=5, time_window=60) |
| reader = PdfReader("me/linkedin.pdf") |
| self.linkedin = "" |
| for page in reader.pages: |
| text = page.extract_text() |
| if text: |
| self.linkedin += text |
| with open("me/summary.txt", "r", encoding="utf-8") as f: |
| self.summary = f.read() |
|
|
|
|
| def handle_tool_call(self, tool_calls): |
| results = [] |
| for tool_call in tool_calls: |
| tool_name = tool_call.function.name |
| arguments = json.loads(tool_call.function.arguments) |
| print(f"Tool called: {tool_name}", flush=True) |
| tool = globals().get(tool_name) |
| result = tool(**arguments) if tool else {} |
| results.append({"role": "tool","content": json.dumps(result),"tool_call_id": tool_call.id}) |
| return results |
| |
| def system_prompt(self): |
| system_prompt = f"You are acting as {self.name}. You are answering questions on {self.name}'s website, \ |
| particularly questions related to {self.name}'s career, background, skills and experience. \ |
| Your responsibility is to represent {self.name} for interactions on the website as faithfully as possible. \ |
| You are given a summary of {self.name}'s background and LinkedIn profile which you can use to answer questions. \ |
| Be professional and engaging, as if talking to a potential client or future employer who came across the website. \ |
| If you don't know the answer to any question, use your record_unknown_question tool to record the question that you couldn't answer, even if it's about something trivial or unrelated to career. \ |
| If the user is engaging in discussion, try to steer them towards getting in touch via email; ask for their email and record it using your record_user_details tool. \ |
| When a user provides their email, both a push notification and an email notification will be sent. If the user does not provide any note in the message \ |
| in which they provide their email, then give a summary of the conversation so far as the notes." |
|
|
| system_prompt += f"\n\n## Summary:\n{self.summary}\n\n## LinkedIn Profile:\n{self.linkedin}\n\n" |
| system_prompt += f"With this context, please chat with the user, always staying in character as {self.name}." |
| return system_prompt |
| |
| def chat(self, message, history): |
| |
| try: |
| |
| request = Context.get_context().request |
| |
| forwarded_for = request.headers.get("X-Forwarded-For") |
| |
| cloudflare_ip = request.headers.get("Cf-Connecting-IP") |
| |
| if forwarded_for: |
| |
| user_id = forwarded_for.split(",")[0].strip() |
| elif cloudflare_ip: |
| user_id = cloudflare_ip |
| else: |
| |
| user_id = request.client.host |
| except (AttributeError, RuntimeError, fastapi.exceptions.FastAPIError): |
| |
| user_id = "default_user" |
| logger.debug(f"User ID: {user_id}") |
| if self.rate_limiter.is_rate_limited(user_id): |
| return "You're sending messages too quickly. Please wait a moment before sending another message." |
| |
| messages = [{"role": "system", "content": self.system_prompt()}] |
|
|
| |
| if isinstance(history, list) and all(isinstance(h, dict) for h in history): |
| messages.extend(history) |
| else: |
| |
| for user_msg, assistant_msg in history: |
| messages.append({"role": "user", "content": user_msg}) |
| messages.append({"role": "assistant", "content": assistant_msg}) |
|
|
| messages.append({"role": "user", "content": message}) |
|
|
| done = False |
| while not done: |
| response = self.openai.chat.completions.create( |
| model="gemini-2.0-flash", |
| messages=messages, |
| tools=tools |
| ) |
| if response.choices[0].finish_reason == "tool_calls": |
| tool_calls = response.choices[0].message.tool_calls |
| tool_result = self.handle_tool_call(tool_calls) |
| messages.append(response.choices[0].message) |
| messages.extend(tool_result) |
| else: |
| done = True |
|
|
| return response.choices[0].message.content |
|
|
| |
|
|
| if __name__ == "__main__": |
| me = Me() |
| gr.ChatInterface(me.chat, type="messages").launch() |
| |