from dash import Dash, dcc, html, Input, Output, State,no_update as dash_no_update from pandas import DataFrame , concat as pd_concat from pymupdf import open as pdf_open from json import loads as json_loads from sklearn.decomposition import PCA from numpy import array as np_array, ndarray,where as np_where from base64 import b64decode from scipy.spatial.distance import cosine from requests import Session from re import search as re_search import plotly.express as px import plotly.graph_objs as go import dash_bootstrap_components as dbc langs:dict = json_loads(open("langs.json").read()) app = Dash(__name__, external_stylesheets=["assets/bootstrap_sketch.css"]) app.layout = html.Div([ dcc.Upload(dbc.Button('Upload File',style={'textAlign': 'center',"width":"100%"}),id="button-upload-id",multiple=True,accept=".pdf"), html.Hr(), dbc.InputGroup([dbc.InputGroupText("API"),dbc.Input(id="input-api",type="text",placeholder="Enter a API KEY",value="",style={"textAlign":"center"})]), html.Hr(), dcc.Dropdown(id="langselect",options= [i for i in langs.keys()],placeholder="Language",style={"textAlign":"center"}), dbc.Alert(id='alert-id',is_open=False,duration=2000), dcc.Graph(id="graph"), html.Hr(), html.Iframe(id="flip-book-iframe",src="assets/index.html",style={"width":"100%","height":"66vh"}), html.Hr(), dbc.Button("Reset",id="reset-button",n_clicks=0,style={"width":"100%"}), html.Hr(), dbc.InputGroup([dbc.InputGroupText("Src Number"),dbc.Input(id="input-src",type="number",placeholder="Enter a number of nearest data points",value=10,style={"textAlign":"center"})]), html.Hr(), dbc.InputGroup([dbc.Input(id="input-query",type="text",placeholder="Enter a Query",value="Dhyaan shu chhe",style={"textAlign":"center"}),dbc.Button("Ask Query",id="query-button",n_clicks=0)]), dcc.Store(id="intermediate-data"), dcc.Store(id="emb-data") ]) embcache = {} pca = PCA(n_components=3) transsession = Session() langconvsession = Session() embsession = Session() def book_to_df(embedding_metadata,name:str)->DataFrame: embedding = [np_array(i["embedding"]) for i in embedding_metadata] df = DataFrame(pca.fit_transform(embedding), columns=["X", "Y", "Z"]) df["maintext"] = [i["maintext"] for i in embedding_metadata] df["text"] = [i["text"] for i in embedding_metadata] df["embedding"] = embedding df["bookname"] = name df["id"] = [name+":"+str(i) for i in range(1,len(embedding_metadata)+1)] return df def rank_vectors_cosine(vector_store: DataFrame, query_vector: ndarray, k: int = 10) -> DataFrame: vector_store['similarity'] = vector_store['embedding'].apply(lambda vec: 1 - cosine(query_vector, vec)) vector_store = vector_store[vector_store['similarity'] != 1.0] vector_store = vector_store.sort_values(by="similarity", ascending=False).reset_index(drop=True) vector_store['similar'] = np_where(vector_store.index < k, "Similar", "Not similar") vector_store['rank'] = vector_store.index + 1 return vector_store def repeat_pattern(df:DataFrame, repeat_value): new_data = {} for column in df.columns: values = df[column].values repeated_values = [] for v in values: repeated_values.append(repeat_value[column]) # Add repeated value repeated_values.append(v) new_data[column] = repeated_values return DataFrame(new_data) def get_embedding(text:str,hfapi:str,model_name:str,srclang:str)->ndarray: if (text,srclang,model_name) in embcache: return embcache[text] if re_search(r"[A-Za-z]", text): # transliration text = langconvsession.get(f"https://inputtools.google.com/request?itc={srclang}-t-i0-und&num=1&text={text}").json()[1][0][1][0] text = "".join([i[0] for i in transsession.get(f'https://translate.googleapis.com/translate_a/single?client=gtx&sl={srclang}&tl=en&dt=t&q={text}').json()[0]]) responce = embsession.post("https://api-inference.huggingface.co/models/"+model_name, headers={"Authorization": "Bearer "+hfapi}, json={"inputs": text}) if not responce.status_code == 200: return embedding = np_array(responce.json()) embcache[(text,srclang,model_name)] = embedding return embedding @app.callback( [ Output('alert-id', 'is_open'), Output('alert-id', 'color'), Output('alert-id', 'children'), Output("graph", "figure", allow_duplicate=True), # Enable duplicate output Output("intermediate-data", "data"), Output("emb-data", "data") ], Input('button-upload-id', 'contents'), State('button-upload-id', 'filename'), prevent_initial_call=True ) def pdf_upload(contents:list[str], filenames:list[str]): try: data = [] centralemb = "" for content,filename in zip(contents,filenames): pdf = pdf_open(stream=b64decode(content.split(',')[1]),filetype="pdf") name = filename.split(".")[0] if not "embedding_metadata" in pdf.embfile_names(): raise Exception("No embedding metadata found in {} PDF file.".format(name)) embname = pdf.embfile_info("embedding_metadata")["description"].split(":",1)[1] if(centralemb!="" and centralemb != embname): raise Exception("Embedding Model is not matched") centralemb = embname data.append(book_to_df(json_loads(pdf.embfile_get("embedding_metadata")),name)) df = pd_concat(data) fig = px.scatter_3d(df,x="X",y="Y",z="Z",title="3D Scatter Plot of RAG System Embeddings",hover_data={"X": False, "Y": False, "Z": False,"bookname":False},hover_name="id",color="bookname") return True,"success", "{} pages Uploaded successfully!".format(len(df)), fig, df.to_dict("records"),centralemb except Exception as e: return True,"danger", str(e),dash_no_update,None,None @app.callback( Output("graph", "figure",allow_duplicate=True), [ Input("reset-button", "n_clicks"), Input("intermediate-data", "data") ], prevent_initial_call=True ) def reset(n_clicks,intermediate_data): if not all([n_clicks,n_clicks != 0,intermediate_data]): return dash_no_update return px.scatter_3d(DataFrame(intermediate_data),x="X",y="Y",z="Z",title="3D Scatter Plot of RAG System Embeddings",hover_data={"X": False, "Y": False, "Z": False,"bookname":False},hover_name="id",custom_data=["maintext","text"],color="bookname") @app.callback( Output("graph", "figure",allow_duplicate=True), # Enable duplicate output [ Input("graph", "clickData"), Input("intermediate-data", "data"), Input("input-src", "value") ], prevent_initial_call=True ) def on_click_point(click_data, intermediate_data, input_num): if not all([click_data,intermediate_data,input_num,input_num != 0]): return dash_no_update point = click_data['points'][0] if not "hovertext" in point: return dash_no_update if point["customdata"][0] == "yes": return dash_no_update id = point['hovertext'] df = DataFrame(intermediate_data) querydf = df[df["id"]==id] qx,qy,qz = querydf[["X","Y","Z"]].values[0] query = querydf["embedding"].values[0] results = rank_vectors_cosine(df,query,input_num) fig = px.scatter_3d(results,x="X",y="Y",z="Z",hover_data={"X": False, "Y": False, "Z": False,"similar":False,"similarity":True,"rank":True},hover_name="id",color="similar") fig.add_trace(go.Scatter3d(x=[qx],y=[qy],z=[qz],mode="markers",marker=dict(size=12,color="green"),name="Query",hovertext=[id])) similar_df = results[results["similar"]=="Similar"] similar_df = similar_df[["X","Y","Z"]].reset_index(drop=True) similar_df = repeat_pattern(similar_df,{"X":qx,"Y":qy,"Z":qz}) fig2 = px.line_3d(similar_df,x="X",y="Y",z="Z",hover_data={"X": False, "Y": False, "Z": False}) combined_fig = go.Figure(data=fig.data + fig2.data) combined_fig.update_layout(fig.layout) return combined_fig @app.callback( Output("graph", "figure",allow_duplicate=True), [ Input("query-button", "n_clicks"), Input("intermediate-data", "data"), Input("input-src", "value"), ], State("input-query", "value"), State("input-api", "value"), State("emb-data", "data"), State("langselect", "value"), prevent_initial_call=True ) def query(n_clicks_query,intermediate_data,input_num:int,query:str,api:str,embmodel:str,lang:str): if not all([n_clicks_query,intermediate_data,query,lang,query.strip() != "",api.strip() != ""]): return dash_no_update df = DataFrame(intermediate_data) embs = df["embedding"].apply(np_array).tolist() queryemb = get_embedding(query,api,embmodel,langs[lang]) pcares = pca.fit_transform(embs+[queryemb]) qx,qy,qz = pcares[-1] df[["X","Y","Z"]] = DataFrame(pcares[:-1],columns=["X","Y","Z"]) results = rank_vectors_cosine(df,queryemb,input_num) results["ansofquery"] = "yes" fig = px.scatter_3d(results,x="X",y="Y",z="Z",hover_data={"X": False, "Y": False, "Z": False,"similar":False,"similarity":True,"rank":True},hover_name="id",color="similar",custom_data=["ansofquery"]) fig.add_trace(go.Scatter3d(x=[qx],y=[qy],z=[qz],mode="markers",marker=dict(size=12,color="green"),name="Query",customdata=["yes"])) fig2 = px.line_3d(repeat_pattern(results[results["similar"]=="Similar"][["X","Y","Z"]].reset_index(drop=True),{"X":qx,"Y":qy,"Z":qz}),x="X",y="Y",z="Z",hover_data={"X": False, "Y": False, "Z": False}) return go.Figure(data=fig.data + fig2.data) app.clientside_callback( """ function (clickData) { if (clickData.points[0].hovertext) { const match = clickData.points[0].hovertext.match(/^(.*?):(\d+)$/); if (match) { data = { bookName: match[1], page: parseInt(match[2], 10) }; if (window.current_book != data.bookName) { framewin = document.getElementById("flip-book-iframe").contentWindow framewin.DFLIP.openURL(window.blobs[data.bookName]) framewin.DFLIP.activeLightBox.closeButton.hide() } window.current_book = data.bookName; framewin.DFLIP.extrafeatures.gotoPage(data.page); } } } """, Input("graph", "clickData"), prevent_initial_call=True ) app.clientside_callback( """ function (contentslist, filenameslist) { window.blobs = {}; window.current_book = ""; contentslist.forEach((element,index) => { const [contentType, base64Data] = element.split(','); const binaryString = atob(base64Data); const len = binaryString.length; const bytes = new Uint8Array(len); for (let i = 0; i < len; i++) { bytes[i] = binaryString.charCodeAt(i); } const blob = new Blob([bytes], { type: contentType.split(':')[1].split(';')[0] }); const blobUrl = URL.createObjectURL(blob); const file = filenameslist[index]; const filename = file.substring(0, file.lastIndexOf('.')); window.blobs[filename] = blobUrl; return; }); } """, Input("button-upload-id", "contents"), State("button-upload-id", "filename"), prevent_initial_call=True, ) if __name__ == '__main__': app.run(host="0.0.0.0",port=7860)