Adityashriv's picture
Update app.py
c2e4280 verified
import os
import re
import warnings
import pandas as pd
import backoff
from datetime import datetime
from dotenv import load_dotenv
from langchain_ollama import OllamaEmbeddings, ChatOllama
from langchain_text_splitters import MarkdownHeaderTextSplitter
from langchain_community.vectorstores import FAISS
from langchain_community.docstore.in_memory import InMemoryDocstore
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from docling.document_converter import DocumentConverter
from opik import Opik, track, evaluate
from opik.evaluation.metrics import Hallucination, AnswerRelevance
from opik.evaluation import models
import litellm
import opik
from litellm.integrations.opik.opik import OpikLogger
from litellm import completion, APIConnectionError
from langchain_huggingface import HuggingFaceEmbeddings, ChatHuggingFace, HuggingFaceEndpoint
from fastapi import FastAPI, UploadFile, File, HTTPException, Query
app = FastAPI()
# Load environment variables
def load_env():
load_dotenv()
os.environ.setdefault("OPIK_PROJECT_NAME", "Deepseek_eval")
os.environ.setdefault("OPIK_API_KEY", "BX9OYn3NZBKuztCxL4XvMOeeI")
def initialize_opik():
opik_logger = OpikLogger()
litellm.callbacks = [opik_logger]
opik.configure(api_key="BX9OYn3NZBKuztCxL4XvMOeeI",workspace="komalgupta991000-gmail-com",force=True)
# Initialize Opik and load environment variables
load_env()
initialize_opik()
# Initialize Opik Client
dataset = Opik().get_or_create_dataset(
name="Refugee_crises_mental_health",
description="Dataset on refugee crises and mental health"
)
@app.post("/upload_dataset/")
def upload_dataset(file: UploadFile = File(...)):
try:
df = pd.read_excel(file.file)
dataset.insert(df.to_dict(orient='records'))
return {"message": "Dataset uploaded successfully"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# To use the uploaded dataset in the evaluation task manually
# def upload_dataset():
# df = pd.read_excel("dataset.xlsx")
# dataset.insert(df.to_dict(orient='records'))
# return "Dataset uploaded successfully"
# Initialize LLM Models
model = ChatOllama(model="deepseek-r1:7b", base_url="http://localhost:11434", temperature=0.2, max_tokens=200)
# model1 = models.LiteLLMChatModel(model_name="ollama/gemma2:2b", base_url="http://localhost:11434")
# embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2", model_kwargs={'device': 'cuda'}, encode_kwargs={'normalize_embeddings': True})
# model = ChatHuggingFace(llm=HuggingFaceEndpoint(repo_id="HuggingFaceH4/zephyr-7b-beta", task="text-generation", max_new_tokens=512, temperature=0.2))
# Convert Document to Markdown
def load_and_convert_document(file_path):
return DocumentConverter().convert(file_path).document.export_to_markdown()
# Markdown Splitting
def get_markdown_splits(markdown_content):
splitter = MarkdownHeaderTextSplitter([("#", "Header 1"), ("##", "Header 2"), ("###", "Header 3")])
return splitter.split_text(markdown_content)
# Vector Store Setup
def setup_vector_store(documents):
embeddings = OllamaEmbeddings(model='nomic-embed-text', base_url="http://localhost:11434")
vectorstore = FAISS.from_documents(documents, embeddings)
vectorstore.save_local("deepseek_ollama/deepseek_db")
return vectorstore
# Load Vector Store
embeddings = OllamaEmbeddings(model='nomic-embed-text', base_url="http://localhost:11434")
vectorstore = FAISS.load_local("deepseek_ollama/deepseek_db", embeddings, allow_dangerous_deserialization=True)
retriever = vectorstore.as_retriever(search_type="mmr", search_kwargs={'k': 2})
# Create RAG Chain
def create_rag_chain(retriever):
prompt_template = ChatPromptTemplate.from_template(
"""
You are an AI research assistant specializing in the study of the refugee crisis and its impact on child mental health.
- Your primary role is to assist researchers,professionals by analyzing, summarizing, and generating insights based on provided research papers and academic data
Context: You will be given a large dataset containing research papers and studies on this topic. Your responses must be strictly derived from the provided research data and should focus solely on answering user queries related to the refugee crisis and its effects on child mental health.
Strict Guidelines:
Scope Restriction: You *must not answer any questions outside the refugee crisis and child mental health domain.* If a user query is unrelated, politely refuse to answer.
If you don't know the answer, just say that you don't know, don't try to make up an answer.
Don't use your own knowledge or experience to answer questions.
For generic queries like 'Hi,' 'Hello,' 'What can you do for me?' or 'Who are you?', respond with a pleasant greeting and a brief introduction of your role. Stick to simple greetings like 'Hi there' or 'Hey,' and avoid assigning a random name to the user.
Fact-Based Responses: Your answers should be strictly based on the provided research data. Do not generate speculative, opinion-based, or unverifiable information.
Academic Integrity: Provide responses in a structured, well-cited manner, ensuring academic rigor and clarity.
Example Scenarios:
Allowed Queries:
"What are the psychological effects of forced displacement on children?"
"How does prolonged refugee status impact child cognitive development?"
"Are there any studies on PTSD prevalence in refugee children?"
Few example of Restricted Queries:
"Tell me about the global economic impact of the refugee crisis."
"Can you summarize recent political policies on immigration?"
"What are some coping mechanisms for stress in general?"
Any thing outside the refugee crisis and child mental health domain should be avoided.
Your primary goal is to advance research in this field by providing data-backed, insightful, and academically sound responses.
VALIDATION STEP (REQUIRED):
1. First, determine if the question is related to refugee crisis and child mental health.
2. If unrelated, respond ONLY with: "This question is outside my specialized domain of refugee crisis and child mental health research. I can only answer questions related to these topics."
3. Do not provide any other information for unrelated questions.
Question: {question}
Context: {context}
Answer:
"""
)
return (
{"context": retriever | format_docs, "question": RunnablePassthrough()}
| prompt_template
| model
| StrOutputParser()
)
def format_docs(docs):
return "\n\n".join(doc.page_content for doc in docs)
def clean_response(response):
return re.sub(r'<think>.*?</think>', '', response, flags=re.DOTALL).strip()
rag_chain = create_rag_chain(retriever)
@track()
def llm_chain(input_text):
try:
context = "\n".join(doc.page_content for doc in retriever.invoke(input_text))
response = "".join(chunk for chunk in rag_chain.stream(input_text) if isinstance(chunk, str))
return {"response": clean_response(response), "context_used": context}
except Exception as e:
return {"error": str(e)}
def evaluation_task(x):
try:
result = llm_chain(x['user_question'])
return {"input": x['user_question'], "output": result["response"], "context": result["context_used"], "expected": x['expected_output']}
except Exception as e:
return {"input": x['user_question'], "output": "", "context": x['expected_output']}
# experiment_name = f"Deepseek_{dataset.name}_{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}"
# metrics = [Hallucination(model=model1), AnswerRelevance(model=model1)]
@app.post("/run_evaluation/")
@backoff.on_exception(backoff.expo, (APIConnectionError, Exception), max_tries=3, max_time=300)
def run_evaluation():
experiment_name = f"Deepseek_{dataset.name}_{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}"
metrics = [Hallucination(), AnswerRelevance()]
try:
evaluate(
experiment_name=experiment_name,
dataset=dataset,
task=evaluation_task,
scoring_metrics=metrics,
experiment_config={"model": model},
task_threads=2
)
return {"message": "Evaluation completed successfully"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# @backoff.on_exception(backoff.expo, (APIConnectionError, Exception), max_tries=3, max_time=300)
# def run_evaluation():
# return evaluate(experiment_name=experiment_name, dataset=dataset, task=evaluation_task, scoring_metrics=metrics, experiment_config={"model": model}, task_threads=2)
# run_evaluation()
# Create Vector Database
def create_db():
source = r'AI Agent'
all_documents = []
for filename in os.listdir(source):
file_path = os.path.join(source, filename)
markdown_content = load_and_convert_document(file_path)
all_documents.extend(get_markdown_splits(markdown_content))
setup_vector_store(all_documents)
return "Database created successfully"
@track()
@app.get("/query/")
def chain(input_text: str = Query(..., description="Enter your question")):
try:
response= llm_chain(input_text)
return response["response"]
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/")
def root():
return {"message": "API is running successfully"}
# if __name__ == "__main__":
# questions=[ "famous places to visit in india","what is the elligibility criteria to get green card in usa"]
# # Questions for retrieval
# # Answer questions
# for question in questions:
# print(f"Question: {question}")
# for chunk in llm_chain(question):
# print(chunk, end="", flush=True)
# print("\n" + "-" * 50 + "\n")