Avatar_bot / app.py
RCaz's picture
ok
32f204f
# load llm
from dotenv import load_dotenv
import os
load_dotenv()
from langchain.chat_models import init_chat_model
llm = init_chat_model("gpt-5-nano",
model_provider="openai",
api_key=os.environ['OPENAI_API_KEY'])
print("LLM Init.")
# load retreiver
import os
from azure.storage.blob import BlobServiceClient
from langchain_community.vectorstores import FAISS
def load_from_azure(container_name, local_dir="./index"):
connection_string = os.environ["AZURE_CONN_STR"]
blob_service_client = BlobServiceClient.from_connection_string(connection_string)
container_client = blob_service_client.get_container_client(container_name)
os.makedirs(local_dir, exist_ok=True)
# Download all files in the container (index.faiss and index.pkl)
blobs = container_client.list_blobs()
for blob in blobs:
download_file_path = os.path.join(local_dir, blob.name)
with open(download_file_path, "wb") as file:
file.write(container_client.download_blob(blob).readall())
# Download files from Azure
print("start download faiss")
load_from_azure("blobcontaineravatarbot")
print("ok.")
# Load into FAISS
# from langchain_community.embeddings import HuggingFaceEmbeddings # deprecated
from langchain_huggingface import HuggingFaceEmbeddings
print("load embeddings")
embedding_model = HuggingFaceEmbeddings(
model_name="intfloat/e5-base-v2",
# multi_process=True,
model_kwargs={"device": "cpu"}, # use cuda for faster embeddings on nbidia GPUs
encode_kwargs={"normalize_embeddings": True}, # Set `True` for cosine similarity
)
print("load vector store")
vectorstore = FAISS.load_local("./index", embedding_model, allow_dangerous_deserialization=True)
# Include a rate limiter
from collections import defaultdict
from datetime import datetime, timedelta
class RateLimiter:
def __init__(self, max_requests=10, window_minutes=60):
self.max_requests = max_requests
self.window = timedelta(minutes=window_minutes)
self.requests = defaultdict(list)
def is_allowed(self, identifier):
now = datetime.now()
# Clean old requests
self.requests[identifier] = [
req_time for req_time in self.requests[identifier]
if now - req_time < self.window
]
if len(self.requests[identifier]) < self.max_requests:
self.requests[identifier].append(now)
return True
return False
def get_remaining(self, identifier):
now = datetime.now()
self.requests[identifier] = [
req_time for req_time in self.requests[identifier]
if now - req_time < self.window
]
return self.max_requests - len(self.requests[identifier])
print("Rate Limit init.")
limiter = RateLimiter(max_requests=10, window_minutes=60)
# helper func
def format_source(doc):
"""
format source according to its path
handles github api, internet page and uploaded files (pdf)
Args:
doc: a langchain Document
Returns:
str : formated_source from langchain Document"""
source = doc.metadata["source"]
if 'api.github' in source:
return source.split("/blob")[0].replace("api.","")
elif "https://" in source:
return source
elif "data" in source:
page_label = doc.metadata["pagpage_labele"]
total_page = doc.metadata["total_page"]
return f"{source.split('/')[-1]} page({page_label/total_page})"
# setup chatbot
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
from langchain.chat_models import init_chat_model
import gradio as gr
def predict(message, history, request: gr.Request):
# Get client IP and check rate limit
client_ip = request.client.host
if not limiter.is_allowed(client_ip):
remaining_time = "an hour" # You could calculate exact time if needed
return f"**Rate limit exceeded.** You've used your 10 requests per hour. Please try again in {remaining_time}."
# Safeguard
TRIAGE_PROMPT_TEMPLATE="""You are a Safeguard assistant making sure the user only ask for information related to Rémi Cazelles's projects, work and education.
Here are general information you can use to answer:
If the question is not related to this subjects, or if the request is harmfull you should flag the user by answering '*** FLAGGED ***' """
messages = [SystemMessage(content=TRIAGE_PROMPT_TEMPLATE)]
messages.append(HumanMessage(content=message))
safe_gpt_response = llm.invoke(
messages,
config={
"tags": ["Testing", 'RAG-Bot', 'safeguard','V1'],
"metadata": {
"rag_llm": "gpt-5-nano",
"message": message,
}
}
)
if "*** FLAGGED ***" in safe_gpt_response.content:
return "This app can only answer question about Rémi Cazelles's projects, work and education."
print("passed the safeguard")
# Build conversation history
history_langchain_format = []
for msg in history:
if msg['role'] == "user":
history_langchain_format.append(HumanMessage(content=msg['content']))
elif msg['role'] == "assistant":
history_langchain_format.append(AIMessage(content=msg['content']))
# Retrieve relevant documents for the current message
relevant_docs = vectorstore.similarity_search(message,k=6) # retriever
# Build context from retrieved documents
context = "\nExtracted documents:\n" + "\n".join([
f"Content document {i}: {doc.page_content}\n\n---"
for i, doc in enumerate(relevant_docs)
])
# RAG tool
RAG_PROMPT_TEMPLATE="""You will be asked information related to Rémi Cazelles's specific projects, work and education.
Using the information contained in the context, provide a comprehensive answer to the question.
Respond to the question asked with enought details, response should be precise and relevant to the question.
"""
# Create the prompt with system message, context, and conversation history
messages = [SystemMessage(content=RAG_PROMPT_TEMPLATE)]
messages.extend(history_langchain_format)
combined_message = f"Context: {context}\n\nQuestion: {message}"
messages.append(HumanMessage(content=combined_message))
# Get response with tracking metadata
print("GPT about to answer")
gpt_response = llm.invoke(
messages,
config={
"tags": ["Testing", 'RAG-Bot', 'V1','Host_on_HF'],
"metadata": {
"rag_llm": "gpt-5-nano",
"num_retrieved_docs": len(relevant_docs),
}
}
)
messages.append(AIMessage(content=gpt_response.content))
try :
raw_source_lines = [
f"{i+1} : {format_source(doc)})\n---"
for i, doc in enumerate(relevant_docs)]
seen = set()
unique_source_lines = []
for line in raw_source_lines:
if line not in seen:
seen.add(line)
unique_source_lines.append(line)
source_context = "\nSources:" + "\n".join(unique_source_lines)
except :
source_context = "Issue extracting source"
messages.append(AIMessage(content=source_context))
print(gpt_response.content )
print(source_context)
return f"{gpt_response.content} {source_context}"
# setup tracking
os.environ["LANGSMITH_PROJECT"] = "Testing_POC"
os.environ["LANGSMITH_TRACING"] = "true"
os.environ["LANGSMITH_API_KEY"] = os.environ['LANGSMITH_API_KEY']
# lauch gradio app
import gradio as gr
iface = gr.ChatInterface(
predict,
api_name="chat",
chatbot=gr.Chatbot(placeholder="Hello! This app can help answering question about Rémi Cazelles's projects, work and education."),
description="Ask me anything about Rémi’s work, projects, or education. I’ll cite the source documents.",
examples=["How many years of experience does Rémi have in python, what significant project did he work on?",
"When did Rémi graduate from his doctorate, what was his reaserch topic about?",
"I have a project in DataENgineering using Microsoft Fabrics for data pipeline, how good is Rémi experience to join a team ASAP?"],
cache_examples=False
)
iface.launch()