from dash import Dash, dcc, html, Input, Output, State,no_update as dash_no_update
from pandas import DataFrame , concat as pd_concat
from pymupdf import open as pdf_open
from json import loads as json_loads
from sklearn.decomposition import PCA
from numpy import array as np_array, ndarray,where as np_where
from base64 import b64decode
from scipy.spatial.distance import cosine
from requests import Session
from re import search as re_search
import plotly.express as px
import plotly.graph_objs as go
import dash_bootstrap_components as dbc
langs:dict = json_loads(open("langs.json").read())
app = Dash(__name__, external_stylesheets=["assets/bootstrap_sketch.css"])
app.layout = html.Div([
dcc.Upload(dbc.Button('Upload File',style={'textAlign': 'center',"width":"100%"}),id="button-upload-id",multiple=True,accept=".pdf"),
html.Hr(),
dbc.InputGroup([dbc.InputGroupText("API"),dbc.Input(id="input-api",type="text",placeholder="Enter a API KEY",value="",style={"textAlign":"center"})]),
html.Hr(),
dcc.Dropdown(id="langselect",options= [i for i in langs.keys()],placeholder="Language",style={"textAlign":"center"}),
dbc.Alert(id='alert-id',is_open=False,duration=2000),
dcc.Graph(id="graph"),
html.Hr(),
html.Iframe(id="flip-book-iframe",src="assets/index.html",style={"width":"100%","height":"66vh"}),
html.Hr(),
dbc.Button("Reset",id="reset-button",n_clicks=0,style={"width":"100%"}),
html.Hr(),
dbc.InputGroup([dbc.InputGroupText("Src Number"),dbc.Input(id="input-src",type="number",placeholder="Enter a number of nearest data points",value=10,style={"textAlign":"center"})]),
html.Hr(),
dbc.InputGroup([dbc.Input(id="input-query",type="text",placeholder="Enter a Query",value="Dhyaan shu chhe",style={"textAlign":"center"}),dbc.Button("Ask Query",id="query-button",n_clicks=0)]),
dcc.Store(id="intermediate-data"),
dcc.Store(id="emb-data")
])
embcache = {}
pca = PCA(n_components=3)
transsession = Session()
langconvsession = Session()
embsession = Session()
def book_to_df(embedding_metadata,name:str)->DataFrame:
embedding = [np_array(i["embedding"]) for i in embedding_metadata]
df = DataFrame(pca.fit_transform(embedding), columns=["X", "Y", "Z"])
df["maintext"] = [i["maintext"] for i in embedding_metadata]
df["text"] = [i["text"] for i in embedding_metadata]
df["embedding"] = embedding
df["bookname"] = name
df["id"] = [name+":"+str(i) for i in range(1,len(embedding_metadata)+1)]
return df
def rank_vectors_cosine(vector_store: DataFrame, query_vector: ndarray, k: int = 10) -> DataFrame:
vector_store['similarity'] = vector_store['embedding'].apply(lambda vec: 1 - cosine(query_vector, vec))
vector_store = vector_store[vector_store['similarity'] != 1.0]
vector_store = vector_store.sort_values(by="similarity", ascending=False).reset_index(drop=True)
vector_store['similar'] = np_where(vector_store.index < k, "Similar", "Not similar")
vector_store['rank'] = vector_store.index + 1
return vector_store
def repeat_pattern(df:DataFrame, repeat_value):
new_data = {}
for column in df.columns:
values = df[column].values
repeated_values = []
for v in values:
repeated_values.append(repeat_value[column]) # Add repeated value
repeated_values.append(v)
new_data[column] = repeated_values
return DataFrame(new_data)
def get_embedding(text:str,hfapi:str,model_name:str,srclang:str)->ndarray:
if (text,srclang,model_name) in embcache:
return embcache[text]
if re_search(r"[A-Za-z]", text): # transliration
text = langconvsession.get(f"https://inputtools.google.com/request?itc={srclang}-t-i0-und&num=1&text={text}").json()[1][0][1][0]
text = "".join([i[0] for i in transsession.get(f'https://translate.googleapis.com/translate_a/single?client=gtx&sl={srclang}&tl=en&dt=t&q={text}').json()[0]])
responce = embsession.post("https://api-inference.huggingface.co/models/"+model_name, headers={"Authorization": "Bearer "+hfapi}, json={"inputs": text})
if not responce.status_code == 200:
return
embedding = np_array(responce.json())
embcache[(text,srclang,model_name)] = embedding
return embedding
@app.callback(
[
Output('alert-id', 'is_open'),
Output('alert-id', 'color'),
Output('alert-id', 'children'),
Output("graph", "figure", allow_duplicate=True), # Enable duplicate output
Output("intermediate-data", "data"),
Output("emb-data", "data")
],
Input('button-upload-id', 'contents'),
State('button-upload-id', 'filename'),
prevent_initial_call=True
)
def pdf_upload(contents:list[str], filenames:list[str]):
try:
data = []
centralemb = ""
for content,filename in zip(contents,filenames):
pdf = pdf_open(stream=b64decode(content.split(',')[1]),filetype="pdf")
name = filename.split(".")[0]
if not "embedding_metadata" in pdf.embfile_names():
raise Exception("No embedding metadata found in {} PDF file.".format(name))
embname = pdf.embfile_info("embedding_metadata")["description"].split(":",1)[1]
if(centralemb!="" and centralemb != embname):
raise Exception("Embedding Model is not matched")
centralemb = embname
data.append(book_to_df(json_loads(pdf.embfile_get("embedding_metadata")),name))
df = pd_concat(data)
fig = px.scatter_3d(df,x="X",y="Y",z="Z",title="3D Scatter Plot of RAG System Embeddings",hover_data={"X": False, "Y": False, "Z": False,"bookname":False},hover_name="id",color="bookname")
return True,"success", "{} pages Uploaded successfully!".format(len(df)), fig, df.to_dict("records"),centralemb
except Exception as e:
return True,"danger", str(e),dash_no_update,None,None
@app.callback(
Output("graph", "figure",allow_duplicate=True),
[
Input("reset-button", "n_clicks"),
Input("intermediate-data", "data")
],
prevent_initial_call=True
)
def reset(n_clicks,intermediate_data):
if not all([n_clicks,n_clicks != 0,intermediate_data]):
return dash_no_update
return px.scatter_3d(DataFrame(intermediate_data),x="X",y="Y",z="Z",title="3D Scatter Plot of RAG System Embeddings",hover_data={"X": False, "Y": False, "Z": False,"bookname":False},hover_name="id",custom_data=["maintext","text"],color="bookname")
@app.callback(
Output("graph", "figure",allow_duplicate=True), # Enable duplicate output
[
Input("graph", "clickData"),
Input("intermediate-data", "data"),
Input("input-src", "value")
],
prevent_initial_call=True
)
def on_click_point(click_data, intermediate_data, input_num):
if not all([click_data,intermediate_data,input_num,input_num != 0]):
return dash_no_update
point = click_data['points'][0]
if not "hovertext" in point:
return dash_no_update
if point["customdata"][0] == "yes":
return dash_no_update
id = point['hovertext']
df = DataFrame(intermediate_data)
querydf = df[df["id"]==id]
qx,qy,qz = querydf[["X","Y","Z"]].values[0]
query = querydf["embedding"].values[0]
results = rank_vectors_cosine(df,query,input_num)
fig = px.scatter_3d(results,x="X",y="Y",z="Z",hover_data={"X": False, "Y": False, "Z": False,"similar":False,"similarity":True,"rank":True},hover_name="id",color="similar")
fig.add_trace(go.Scatter3d(x=[qx],y=[qy],z=[qz],mode="markers",marker=dict(size=12,color="green"),name="Query",hovertext=[id]))
similar_df = results[results["similar"]=="Similar"]
similar_df = similar_df[["X","Y","Z"]].reset_index(drop=True)
similar_df = repeat_pattern(similar_df,{"X":qx,"Y":qy,"Z":qz})
fig2 = px.line_3d(similar_df,x="X",y="Y",z="Z",hover_data={"X": False, "Y": False, "Z": False})
combined_fig = go.Figure(data=fig.data + fig2.data)
combined_fig.update_layout(fig.layout)
return combined_fig
@app.callback(
Output("graph", "figure",allow_duplicate=True),
[
Input("query-button", "n_clicks"),
Input("intermediate-data", "data"),
Input("input-src", "value"),
],
State("input-query", "value"),
State("input-api", "value"),
State("emb-data", "data"),
State("langselect", "value"),
prevent_initial_call=True
)
def query(n_clicks_query,intermediate_data,input_num:int,query:str,api:str,embmodel:str,lang:str):
if not all([n_clicks_query,intermediate_data,query,lang,query.strip() != "",api.strip() != ""]):
return dash_no_update
df = DataFrame(intermediate_data)
embs = df["embedding"].apply(np_array).tolist()
queryemb = get_embedding(query,api,embmodel,langs[lang])
pcares = pca.fit_transform(embs+[queryemb])
qx,qy,qz = pcares[-1]
df[["X","Y","Z"]] = DataFrame(pcares[:-1],columns=["X","Y","Z"])
results = rank_vectors_cosine(df,queryemb,input_num)
results["ansofquery"] = "yes"
fig = px.scatter_3d(results,x="X",y="Y",z="Z",hover_data={"X": False, "Y": False, "Z": False,"similar":False,"similarity":True,"rank":True},hover_name="id",color="similar",custom_data=["ansofquery"])
fig.add_trace(go.Scatter3d(x=[qx],y=[qy],z=[qz],mode="markers",marker=dict(size=12,color="green"),name="Query",customdata=["yes"]))
fig2 = px.line_3d(repeat_pattern(results[results["similar"]=="Similar"][["X","Y","Z"]].reset_index(drop=True),{"X":qx,"Y":qy,"Z":qz}),x="X",y="Y",z="Z",hover_data={"X": False, "Y": False, "Z": False})
return go.Figure(data=fig.data + fig2.data)
app.clientside_callback(
"""
function (clickData) {
if (clickData.points[0].hovertext) {
const match = clickData.points[0].hovertext.match(/^(.*?):(\d+)$/);
if (match) {
data = {
bookName: match[1],
page: parseInt(match[2], 10)
};
if (window.current_book != data.bookName) {
framewin = document.getElementById("flip-book-iframe").contentWindow
framewin.DFLIP.openURL(window.blobs[data.bookName])
framewin.DFLIP.activeLightBox.closeButton.hide()
}
window.current_book = data.bookName;
framewin.DFLIP.extrafeatures.gotoPage(data.page);
}
}
}
""",
Input("graph", "clickData"),
prevent_initial_call=True
)
app.clientside_callback(
"""
function (contentslist, filenameslist) {
window.blobs = {};
window.current_book = "";
contentslist.forEach((element,index) => {
const [contentType, base64Data] = element.split(',');
const binaryString = atob(base64Data);
const len = binaryString.length;
const bytes = new Uint8Array(len);
for (let i = 0; i < len; i++) {
bytes[i] = binaryString.charCodeAt(i);
}
const blob = new Blob([bytes], { type: contentType.split(':')[1].split(';')[0] });
const blobUrl = URL.createObjectURL(blob);
const file = filenameslist[index];
const filename = file.substring(0, file.lastIndexOf('.'));
window.blobs[filename] = blobUrl;
return;
});
}
""",
Input("button-upload-id", "contents"),
State("button-upload-id", "filename"),
prevent_initial_call=True,
)
if __name__ == '__main__':
app.run(host="0.0.0.0",port=7860)