Spaces:
Sleeping
Sleeping
| import os | |
| import re | |
| import pandas as pd | |
| import backoff | |
| import asyncio | |
| from datetime import datetime | |
| from dotenv import load_dotenv | |
| from langchain_ollama import OllamaEmbeddings, ChatOllama | |
| from langchain_community.vectorstores import FAISS | |
| from langchain_core.prompts import ChatPromptTemplate | |
| from langchain_core.output_parsers import StrOutputParser | |
| from langchain_core.runnables import RunnablePassthrough | |
| from opik import Opik, track, evaluate | |
| from opik.evaluation.metrics import Hallucination, AnswerRelevance | |
| import litellm | |
| import opik | |
| from fastapi.responses import StreamingResponse | |
| from litellm.integrations.opik.opik import OpikLogger | |
| from litellm import completion, APIConnectionError | |
| from fastapi import FastAPI, UploadFile, File, HTTPException, Query, Response | |
| from langchain.document_loaders import PyMuPDFLoader, UnstructuredWordDocumentLoader | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| app = FastAPI() | |
| def initialize_opik(): | |
| opik_logger = OpikLogger() | |
| litellm.callbacks = [opik_logger] | |
| opik.configure(api_key=os.getenv("OPIK_API_KEY"),workspace=os.getenv("workspace"),force=True) | |
| # Initialize Opik and load environment variables | |
| load_dotenv() | |
| initialize_opik() | |
| # Initialize Opik Client | |
| dataset = Opik().get_or_create_dataset( | |
| name="Cyfuture_faq", | |
| description="Dataset on IGL FAQ", | |
| ) | |
| def upload_dataset(file: UploadFile = File(...)): | |
| try: | |
| df = pd.read_excel(file.file) | |
| dataset.insert(df.to_dict(orient='records')) | |
| return {"message": "Dataset uploaded successfully"} | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| # To use the uploaded dataset in the evaluation task manually | |
| def upload_dataset(): | |
| df = pd.read_excel("dataset.xlsx") | |
| dataset.insert(df.to_dict(orient='records')) | |
| return "Dataset uploaded successfully" | |
| # Initialize LLM Models | |
| model = ChatOllama(model="deepseek-r1:7b", base_url="http://localhost:11434", temperature=0.2, max_tokens=200) | |
| def load_documents(folder_path): | |
| text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=50) | |
| all_documents = [] | |
| os.makedirs('data', exist_ok=True) | |
| for filename in os.listdir(folder_path): | |
| file_path = os.path.join(folder_path, filename) | |
| if filename.endswith('.pdf'): | |
| loader = PyMuPDFLoader(file_path) | |
| elif filename.endswith('.docx'): | |
| loader = UnstructuredWordDocumentLoader(file_path) | |
| else: | |
| continue # Skip unsupported files | |
| documents = loader.load() | |
| all_documents.extend(text_splitter.split_documents(documents)) | |
| print(f"Processed and indexed {filename}") | |
| return all_documents | |
| # Vector Store Setup | |
| def setup_vector_store(documents): | |
| embeddings = OllamaEmbeddings(model='nomic-embed-text', base_url="http://localhost:11434") | |
| vectorstore = FAISS.from_documents(documents, embeddings) | |
| vectorstore.save_local("deepseek_cyfuture") | |
| return vectorstore | |
| # Create RAG Chain | |
| def create_rag_chain(retriever): | |
| prompt_template = ChatPromptTemplate.from_template( | |
| """ | |
| You are an AI questiona answering assistant specialized in answering user queries strictly from the provided context. Give detailed answer to user question considering the context. | |
| STRICT RULES: | |
| For generic user query like hi , hello, how are you, etc. respond with generic response like "Hello! How can I assist you today?" | |
| - You *must not* answer any questions outside the provided context. | |
| - If the question is unrelated to billing, payments, customer, or meter reading, respond with exactly: | |
| **"This question is outside my specialized domain."** | |
| - Do NOT attempt to generate an answer from loosely related context. | |
| - If the context does not contain a valid answer, simply state: **"I don't know the answer."** | |
| VALIDATION STEP: | |
| 1. Check if the query is related to **billing, payments, customer, or meter reading**. | |
| 2. If NOT, respond with: `"This question is outside my specialized domain."` and nothing else. | |
| 3. If the context does not contain relevant data try to find best possible answer from the context. | |
| 4. Do NOT generate speculative answers. | |
| 5. if the generated answer don't adress the question then try to find the best possible answer from the context you can add more releavnt context to the answer. | |
| Question: {question} | |
| Context: {context} | |
| Answer: | |
| """ | |
| ) | |
| return ( | |
| {"context": retriever | format_docs, "question": RunnablePassthrough()} | |
| | prompt_template | |
| | model | |
| | StrOutputParser() | |
| ) | |
| def format_docs(docs): | |
| return "\n\n".join(doc.page_content for doc in docs) | |
| def clean_response(response): | |
| return re.sub(r'<think>.*?</think>', '', response, flags=re.DOTALL).strip() | |
| def llm_chain(input_text): | |
| try: | |
| context = "\n".join(doc.page_content for doc in retriever.invoke(input_text)) | |
| response = "".join(chunk for chunk in rag_chain.stream(input_text) if isinstance(chunk, str)) | |
| return {"response": clean_response(response), "context_used": context} | |
| except Exception as e: | |
| return {"error": str(e)} | |
| def evaluation_task(x): | |
| try: | |
| result = llm_chain(x['user_question']) | |
| return {"input": x['user_question'], "output": result["response"], "context": result["context_used"], "expected": x['expected_output']} | |
| except Exception as e: | |
| return {"input": x['user_question'], "output": "", "context": x['expected_output']} | |
| # experiment_name = f"Deepseek_{dataset.name}_{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}" | |
| # metrics = [Hallucination(model=model1), AnswerRelevance(model=model1)] | |
| def run_evaluation(): | |
| experiment_name = f"Deepseek_{dataset.name}_{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}" | |
| metrics = [Hallucination(), AnswerRelevance()] | |
| try: | |
| evaluate( | |
| experiment_name=experiment_name, | |
| dataset=dataset, | |
| task=evaluation_task, | |
| scoring_metrics=metrics, | |
| experiment_config={"model": model}, | |
| task_threads=2 | |
| ) | |
| return {"message": "Evaluation completed successfully"} | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| # @backoff.on_exception(backoff.expo, (APIConnectionError, Exception), max_tries=3, max_time=300) | |
| # def run_evaluation(): | |
| # return evaluate(experiment_name=experiment_name, dataset=dataset, task=evaluation_task, scoring_metrics=metrics, experiment_config={"model": model}, task_threads=2) | |
| # run_evaluation() | |
| # Create Vector Database | |
| def create_db(): | |
| source = r'AI Agent' | |
| markdown_content = load_documents(source) | |
| setup_vector_store(markdown_content) | |
| return "Database created successfully" | |
| embeddings = OllamaEmbeddings(model='nomic-embed-text', base_url="http://localhost:11434") | |
| vectorstore = FAISS.load_local("deepseek_cyfuture", embeddings, allow_dangerous_deserialization=True) | |
| retriever = vectorstore.as_retriever( search_kwargs={'k': 2}) | |
| rag_chain = create_rag_chain(retriever) | |
| def chain(input_text: str = Query(..., description="Enter your question")): | |
| try: | |
| # def generate(): | |
| # for chunk in rag_chain.stream(input_text): | |
| # if isinstance(chunk, str): | |
| # yield chunk | |
| def generate(): | |
| buffer = "" # Temporary buffer to hold chunks until `</think>` is found | |
| start_sending = False | |
| for chunk in rag_chain.stream(input_text): | |
| # if isinstance(chunk, str): | |
| # buffer += chunk # Append chunk to buffer | |
| # # Check if `</think>` is found | |
| # if "</think>" in buffer: | |
| # start_sending = True | |
| # # Yield everything after `</think>` (including `</think>` itself) | |
| # yield buffer.split("</think>", 1)[1] | |
| # buffer = "" # Clear the buffer after sending the first response | |
| # elif start_sending: | |
| yield chunk # Continue yielding after the `</think>` tag | |
| return StreamingResponse(generate(), media_type="text/plain") | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| def read_root(): | |
| return {"message": "Welcome to the AI Assistant API!"} | |
| if __name__ == "__main__": | |
| # start my fastapi app | |
| import uvicorn | |
| uvicorn.run(app, host="127.0.0.1", port=8000) | |
| # questions=[ "Is the website accessible through mobile also? please tell the benefits of it","How do I register for a new connection?","how to make payments?",] | |
| # # Questions for retrieval | |
| # # Answer questions | |
| # create_db() | |
| # # Load Vector Store | |
| # embeddings = OllamaEmbeddings(model='nomic-embed-text', base_url="http://localhost:11434") | |
| # vectorstore = FAISS.load_local("deepseek_cyfuture", embeddings, allow_dangerous_deserialization=True) | |
| # retriever = vectorstore.as_retriever( search_kwargs={'k': 3}) | |
| # rag_chain = create_rag_chain(retriever) | |
| # for question in questions: | |
| # print(f"Question: {question}") | |
| # for chunk in rag_chain.stream(question): | |
| # print(chunk, end="", flush=True) | |
| # print("\n" + "-" * 50 + "\n") |