Avatar_bot / app.py
RCaz's picture
addded helper function format_source to handle github api, http and uploaded files
3dcfb5a
raw
history blame
8.23 kB
# load llm
from dotenv import load_dotenv
import os
load_dotenv()
from langchain.chat_models import init_chat_model
llm = init_chat_model("gpt-5-nano",
model_provider="openai",
api_key=os.environ['OPENAI_API_KEY'])
print("LLM Init.")
# load retreiver
import os
from azure.storage.blob import BlobServiceClient
from langchain_community.vectorstores import FAISS
def load_from_azure(container_name, local_dir="./index"):
connection_string = os.environ["AZURE_CONN_STR"]
blob_service_client = BlobServiceClient.from_connection_string(connection_string)
container_client = blob_service_client.get_container_client(container_name)
os.makedirs(local_dir, exist_ok=True)
# Download all files in the container (index.faiss and index.pkl)
blobs = container_client.list_blobs()
for blob in blobs:
download_file_path = os.path.join(local_dir, blob.name)
with open(download_file_path, "wb") as file:
file.write(container_client.download_blob(blob).readall())
# Download files from Azure
print("start download faiss")
load_from_azure("blobcontaineravatarbot")
print("ok.")
# Load into FAISS
# from langchain_community.embeddings import HuggingFaceEmbeddings # deprecated
from langchain_huggingface import HuggingFaceEmbeddings
print("load embeddings")
embedding_model = HuggingFaceEmbeddings(
model_name="intfloat/e5-base-v2",
# multi_process=True,
model_kwargs={"device": "cpu"}, # use cuda for faster embeddings on nbidia GPUs
encode_kwargs={"normalize_embeddings": True}, # Set `True` for cosine similarity
)
print("load vector store")
vectorstore = FAISS.load_local("./index", embedding_model, allow_dangerous_deserialization=True)
# Include a rate limiter
from collections import defaultdict
from datetime import datetime, timedelta
class RateLimiter:
def __init__(self, max_requests=10, window_minutes=60):
self.max_requests = max_requests
self.window = timedelta(minutes=window_minutes)
self.requests = defaultdict(list)
def is_allowed(self, identifier):
now = datetime.now()
# Clean old requests
self.requests[identifier] = [
req_time for req_time in self.requests[identifier]
if now - req_time < self.window
]
if len(self.requests[identifier]) < self.max_requests:
self.requests[identifier].append(now)
return True
return False
def get_remaining(self, identifier):
now = datetime.now()
self.requests[identifier] = [
req_time for req_time in self.requests[identifier]
if now - req_time < self.window
]
return self.max_requests - len(self.requests[identifier])
print("Rate Limit init.")
limiter = RateLimiter(max_requests=10, window_minutes=60)
# helper func
def format_source(doc):
"""
format source according to its path
handles github api, internet page and uploaded files (pdf)
Args:
doc: a langchain Document
Returns:
str : formated_source from langchain Document"""
source = doc.metadata["source"]
if 'api.github' in source:
return source.split("/blob")[0].replace("api.","")
elif "https://" in source:
return source
elif "data" in source:
page_label = doc.metadata["pagpage_labele"]
total_page = doc.metadata["total_page"]
return f"{source.split("/")[-1]} page({page_label/total_page})"
# setup chatbot
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
from langchain.chat_models import init_chat_model
import gradio as gr
def predict(message, history, request: gr.Request):
# Get client IP and check rate limit
client_ip = request.client.host
if not limiter.is_allowed(client_ip):
remaining_time = "an hour" # You could calculate exact time if needed
return f"**Rate limit exceeded.** You've used your 10 requests per hour. Please try again in {remaining_time}."
# Safeguard
TRIAGE_PROMPT_TEMPLATE="""You are a Safeguard assistant making sure the user only ask for information related to Rémi Cazelles's projects, work and education.
Here are general information you can use to answer:
If the question is not related to this subjects, or if the request is harmfull you should flag the user by answering '*** FLAGGED ***' """
messages = [SystemMessage(content=TRIAGE_PROMPT_TEMPLATE)]
messages.append(HumanMessage(content=message))
safe_gpt_response = llm.invoke(
messages,
config={
"tags": ["Testing", 'RAG-Bot', 'safeguard','V1'],
"metadata": {
"rag_llm": "gpt-5-nano",
"message": message,
}
}
)
if "*** FLAGGED ***" in safe_gpt_response.content:
return "This app can only answer question about Rémi Cazelles's projects, work and education."
print("passed the safeguard")
WELCOME_TEXT = "This bot allows you finding informations related to Rémi Cazelles's projects, work and education"
if not history:
# Gradio expects a list of dicts with keys "role" and "content"
history = [
{"role": "assistant", "content": WELCOME_TEXT}
]
# Build conversation history
history_langchain_format = []
for msg in history:
if msg['role'] == "user":
history_langchain_format.append(HumanMessage(content=msg['content']))
elif msg['role'] == "assistant":
history_langchain_format.append(AIMessage(content=msg['content']))
# Retrieve relevant documents for the current message
relevant_docs = vectorstore.similarity_search(message,k=5)
# Build context from retrieved documents
context = "\nExtracted documents:\n" + "\n".join([
f"Content document {i}: {doc.page_content}\n\n---"
for i, doc in enumerate(relevant_docs)
])
# RAG tool
RAG_PROMPT_TEMPLATE="""You will be asked information related to Rémi Cazelles's specific projects, work and education.
Using the information contained in the context, provide a comprehensive answer to the question.
Respond to the question asked with enought details, response should be precise and relevant to the question.
"""
# Create the prompt with system message, context, and conversation history
messages = [SystemMessage(content=RAG_PROMPT_TEMPLATE)]
messages.append(AIMessage(content=WELCOME_TEXT))
messages.extend(history_langchain_format)
combined_message = f"Context: {context}\n\nQuestion: {message}"
messages.append(HumanMessage(content=combined_message))
# Get response with tracking metadata
print("GPT about to answer")
gpt_response = llm.invoke(
messages,
config={
"tags": ["Testing", 'RAG-Bot', 'V1','Host_on_HF'],
"metadata": {
"rag_llm": "gpt-5-nano",
"num_retrieved_docs": len(relevant_docs),
}
}
)
messages.append(AIMessage(content=gpt_response.content))
try :
raw_source_lines = [
f"{i+1} : {format_source(doc)})\n---"
for i, doc in enumerate(relevant_docs)]
seen = set()
unique_source_lines = []
for line in raw_source_lines:
if line not in seen:
seen.add(line)
unique_source_lines.append(line)
source_context = "\nSources:" + "\n".join(unique_source_lines)
except :
source_context = "Issue extracting source"
messages.append(AIMessage(content=source_context))
print(gpt_response.content )
print(source_context)
return f"{gpt_response.content} {source_context}"
# setup tracking
os.environ["LANGSMITH_PROJECT"] = "Testing_POC"
os.environ["LANGSMITH_TRACING"] = "true"
os.environ["LANGSMITH_API_KEY"] = os.environ['LANGSMITH_API_KEY']
# lauch gradio app
import gradio as gr
iface = gr.ChatInterface(
predict,
api_name="chat",
)
print("Launch ...")
iface.launch(share=True)