|
|
from fastapi import FastAPI, UploadFile, File, HTTPException |
|
|
from fastapi.responses import JSONResponse |
|
|
import fitz |
|
|
from transformers import pipeline |
|
|
from langchain.embeddings.huggingface import HuggingFaceEmbeddings |
|
|
from pinecone import Pinecone |
|
|
import pandas as pd |
|
|
import time |
|
|
|
|
|
|
|
|
app = FastAPI() |
|
|
pc_api_key = "6aa08e79-86e2-4049-b16e-67e4cdafc78f" |
|
|
index_name = 'project' |
|
|
pc = Pinecone(api_key=pc_api_key, environment='gcp-starter', ssl_verify=False) |
|
|
index = pc.Index(index_name) |
|
|
embed_model = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2", encode_kwargs={'batch_size': 32}) |
|
|
pdf_processed = False |
|
|
|
|
|
def extract_text_from_pdf(pdf_file): |
|
|
doc = fitz.open(stream=pdf_file.read(), filetype="pdf") |
|
|
text = "" |
|
|
for page in doc: |
|
|
text += page.get_text() |
|
|
return text |
|
|
|
|
|
def chunk_text(text, chunk_size=40): |
|
|
words = text.split() |
|
|
return [' '.join(words[i:i + chunk_size]) for i in range(0, len(words), chunk_size)] |
|
|
|
|
|
def delete_previous_vectors(): |
|
|
index.delete(delete_all=True, namespace='project') |
|
|
|
|
|
def upsert_vectors(chunks, embed_model): |
|
|
df = pd.DataFrame(chunks, columns=['content']) |
|
|
ids = df.index.astype(str).tolist() |
|
|
x = embed_model.embed_documents(df['content'].tolist()) |
|
|
metadata = [{'text': chunk} for chunk in df['content']] |
|
|
index.upsert(vectors=zip(ids, x, metadata), namespace="project") |
|
|
|
|
|
def pinecone_db(question, embed_model): |
|
|
ques_x = embed_model.embed_query(question) |
|
|
similar_content = index.query(vector=ques_x, top_k=2, namespace='project') |
|
|
similar_ids = [i['id'] for i in similar_content['matches']] |
|
|
ctx = index.fetch(ids=similar_ids, namespace='project') |
|
|
return [ctx['vectors'][vec_id]['metadata']['text'] for vec_id in similar_ids] |
|
|
|
|
|
@app.post("/upload_pdf/") |
|
|
async def upload_pdf(file: UploadFile = File(...)): |
|
|
global pdf_processed |
|
|
if pdf_processed: |
|
|
return JSONResponse(content={"message": "PDF already processed!"}, status_code=200) |
|
|
try: |
|
|
pdf_text = extract_text_from_pdf(file.file) |
|
|
chunks = chunk_text(pdf_text) |
|
|
|
|
|
delete_previous_vectors() |
|
|
upsert_vectors(chunks, embed_model) |
|
|
for _ in range(10): |
|
|
time.sleep(2) |
|
|
if index.describe_index_stats()['total_vector_count'] > 0: |
|
|
pdf_processed = True |
|
|
return JSONResponse(content={"message": "PDF processed and vectors upserted!"}, status_code=200) |
|
|
raise HTTPException(status_code=500, detail="Failed to upsert vectors") |
|
|
except Exception as e: |
|
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
@app.post("/ask_question/") |
|
|
async def ask_question(question: str, model: str): |
|
|
if not question: |
|
|
raise HTTPException(status_code=400, detail="Question cannot be empty") |
|
|
|
|
|
context = pinecone_db(question, embed_model) |
|
|
|
|
|
if model == "Default": |
|
|
qa_pipeline = pipeline("question-answering", max_answer_length=512) |
|
|
elif model == "deepset/roberta-base-squad2": |
|
|
qa_pipeline = pipeline("question-answering", model="deepset/roberta-base-squad2", max_answer_length=512) |
|
|
elif model == "llama": |
|
|
qa_pipeline = pipeline("text-generation", model="meta-llama/Llama-3.2-1B-Instruct") |
|
|
|
|
|
if model != "llama": |
|
|
answer = qa_pipeline(question=question, context=str(context)) |
|
|
else: |
|
|
prompt = f''' |
|
|
<Instruction>: Answer to the following question using the provided context. Only provide Answer and explanation. |
|
|
<Context>: {".".join(context)} |
|
|
<Question>: {question} |
|
|
<Answer>: |
|
|
''' |
|
|
detailed_explanation = qa_pipeline(prompt, max_new_tokens=256) |
|
|
generated_text = detailed_explanation[0]['generated_text'] |
|
|
answer = generated_text[generated_text.find("<Answer>:") + len("<Answer>:"):].strip() |
|
|
return JSONResponse(content={"answer": answer}, status_code=200) |