Spaces:
Sleeping
Sleeping
| from dash import Dash, dcc, html, Input, Output, State,no_update as dash_no_update | |
| from pandas import DataFrame , concat as pd_concat | |
| from pymupdf import open as pdf_open | |
| from json import loads as json_loads | |
| from sklearn.decomposition import PCA | |
| from numpy import array as np_array, ndarray,where as np_where | |
| from base64 import b64decode | |
| from scipy.spatial.distance import cosine | |
| from requests import Session | |
| from re import search as re_search | |
| import plotly.express as px | |
| import plotly.graph_objs as go | |
| import dash_bootstrap_components as dbc | |
| langs:dict = json_loads(open("langs.json").read()) | |
| app = Dash(__name__, external_stylesheets=["assets/bootstrap_sketch.css"]) | |
| app.layout = html.Div([ | |
| dcc.Upload(dbc.Button('Upload File',style={'textAlign': 'center',"width":"100%"}),id="button-upload-id",multiple=True,accept=".pdf"), | |
| html.Hr(), | |
| dbc.InputGroup([dbc.InputGroupText("API"),dbc.Input(id="input-api",type="text",placeholder="Enter a API KEY",value="",style={"textAlign":"center"})]), | |
| html.Hr(), | |
| dcc.Dropdown(id="langselect",options= [i for i in langs.keys()],placeholder="Language",style={"textAlign":"center"}), | |
| dbc.Alert(id='alert-id',is_open=False,duration=2000), | |
| dcc.Graph(id="graph"), | |
| html.Hr(), | |
| html.Iframe(id="flip-book-iframe",src="assets/index.html",style={"width":"100%","height":"66vh"}), | |
| html.Hr(), | |
| dbc.Button("Reset",id="reset-button",n_clicks=0,style={"width":"100%"}), | |
| html.Hr(), | |
| dbc.InputGroup([dbc.InputGroupText("Src Number"),dbc.Input(id="input-src",type="number",placeholder="Enter a number of nearest data points",value=10,style={"textAlign":"center"})]), | |
| html.Hr(), | |
| dbc.InputGroup([dbc.Input(id="input-query",type="text",placeholder="Enter a Query",value="Dhyaan shu chhe",style={"textAlign":"center"}),dbc.Button("Ask Query",id="query-button",n_clicks=0)]), | |
| dcc.Store(id="intermediate-data"), | |
| dcc.Store(id="emb-data") | |
| ]) | |
| embcache = {} | |
| pca = PCA(n_components=3) | |
| transsession = Session() | |
| langconvsession = Session() | |
| embsession = Session() | |
| def book_to_df(embedding_metadata,name:str)->DataFrame: | |
| embedding = [np_array(i["embedding"]) for i in embedding_metadata] | |
| df = DataFrame(pca.fit_transform(embedding), columns=["X", "Y", "Z"]) | |
| df["maintext"] = [i["maintext"] for i in embedding_metadata] | |
| df["text"] = [i["text"] for i in embedding_metadata] | |
| df["embedding"] = embedding | |
| df["bookname"] = name | |
| df["id"] = [name+":"+str(i) for i in range(1,len(embedding_metadata)+1)] | |
| return df | |
| def rank_vectors_cosine(vector_store: DataFrame, query_vector: ndarray, k: int = 10) -> DataFrame: | |
| vector_store['similarity'] = vector_store['embedding'].apply(lambda vec: 1 - cosine(query_vector, vec)) | |
| vector_store = vector_store[vector_store['similarity'] != 1.0] | |
| vector_store = vector_store.sort_values(by="similarity", ascending=False).reset_index(drop=True) | |
| vector_store['similar'] = np_where(vector_store.index < k, "Similar", "Not similar") | |
| vector_store['rank'] = vector_store.index + 1 | |
| return vector_store | |
| def repeat_pattern(df:DataFrame, repeat_value): | |
| new_data = {} | |
| for column in df.columns: | |
| values = df[column].values | |
| repeated_values = [] | |
| for v in values: | |
| repeated_values.append(repeat_value[column]) # Add repeated value | |
| repeated_values.append(v) | |
| new_data[column] = repeated_values | |
| return DataFrame(new_data) | |
| def get_embedding(text:str,hfapi:str,model_name:str,srclang:str)->ndarray: | |
| if (text,srclang,model_name) in embcache: | |
| return embcache[text] | |
| if re_search(r"[A-Za-z]", text): # transliration | |
| text = langconvsession.get(f"https://inputtools.google.com/request?itc={srclang}-t-i0-und&num=1&text={text}").json()[1][0][1][0] | |
| text = "".join([i[0] for i in transsession.get(f'https://translate.googleapis.com/translate_a/single?client=gtx&sl={srclang}&tl=en&dt=t&q={text}').json()[0]]) | |
| responce = embsession.post("https://api-inference.huggingface.co/models/"+model_name, headers={"Authorization": "Bearer "+hfapi}, json={"inputs": text}) | |
| if not responce.status_code == 200: | |
| return | |
| embedding = np_array(responce.json()) | |
| embcache[(text,srclang,model_name)] = embedding | |
| return embedding | |
| def pdf_upload(contents:list[str], filenames:list[str]): | |
| try: | |
| data = [] | |
| centralemb = "" | |
| for content,filename in zip(contents,filenames): | |
| pdf = pdf_open(stream=b64decode(content.split(',')[1]),filetype="pdf") | |
| name = filename.split(".")[0] | |
| if not "embedding_metadata" in pdf.embfile_names(): | |
| raise Exception("No embedding metadata found in {} PDF file.".format(name)) | |
| embname = pdf.embfile_info("embedding_metadata")["description"].split(":",1)[1] | |
| if(centralemb!="" and centralemb != embname): | |
| raise Exception("Embedding Model is not matched") | |
| centralemb = embname | |
| data.append(book_to_df(json_loads(pdf.embfile_get("embedding_metadata")),name)) | |
| df = pd_concat(data) | |
| fig = px.scatter_3d(df,x="X",y="Y",z="Z",title="3D Scatter Plot of RAG System Embeddings",hover_data={"X": False, "Y": False, "Z": False,"bookname":False},hover_name="id",color="bookname") | |
| return True,"success", "{} pages Uploaded successfully!".format(len(df)), fig, df.to_dict("records"),centralemb | |
| except Exception as e: | |
| return True,"danger", str(e),dash_no_update,None,None | |
| def reset(n_clicks,intermediate_data): | |
| if not all([n_clicks,n_clicks != 0,intermediate_data]): | |
| return dash_no_update | |
| return px.scatter_3d(DataFrame(intermediate_data),x="X",y="Y",z="Z",title="3D Scatter Plot of RAG System Embeddings",hover_data={"X": False, "Y": False, "Z": False,"bookname":False},hover_name="id",custom_data=["maintext","text"],color="bookname") | |
| def on_click_point(click_data, intermediate_data, input_num): | |
| if not all([click_data,intermediate_data,input_num,input_num != 0]): | |
| return dash_no_update | |
| point = click_data['points'][0] | |
| if not "hovertext" in point: | |
| return dash_no_update | |
| if point["customdata"][0] == "yes": | |
| return dash_no_update | |
| id = point['hovertext'] | |
| df = DataFrame(intermediate_data) | |
| querydf = df[df["id"]==id] | |
| qx,qy,qz = querydf[["X","Y","Z"]].values[0] | |
| query = querydf["embedding"].values[0] | |
| results = rank_vectors_cosine(df,query,input_num) | |
| fig = px.scatter_3d(results,x="X",y="Y",z="Z",hover_data={"X": False, "Y": False, "Z": False,"similar":False,"similarity":True,"rank":True},hover_name="id",color="similar") | |
| fig.add_trace(go.Scatter3d(x=[qx],y=[qy],z=[qz],mode="markers",marker=dict(size=12,color="green"),name="Query",hovertext=[id])) | |
| similar_df = results[results["similar"]=="Similar"] | |
| similar_df = similar_df[["X","Y","Z"]].reset_index(drop=True) | |
| similar_df = repeat_pattern(similar_df,{"X":qx,"Y":qy,"Z":qz}) | |
| fig2 = px.line_3d(similar_df,x="X",y="Y",z="Z",hover_data={"X": False, "Y": False, "Z": False}) | |
| combined_fig = go.Figure(data=fig.data + fig2.data) | |
| combined_fig.update_layout(fig.layout) | |
| return combined_fig | |
| def query(n_clicks_query,intermediate_data,input_num:int,query:str,api:str,embmodel:str,lang:str): | |
| if not all([n_clicks_query,intermediate_data,query,lang,query.strip() != "",api.strip() != ""]): | |
| return dash_no_update | |
| df = DataFrame(intermediate_data) | |
| embs = df["embedding"].apply(np_array).tolist() | |
| queryemb = get_embedding(query,api,embmodel,langs[lang]) | |
| pcares = pca.fit_transform(embs+[queryemb]) | |
| qx,qy,qz = pcares[-1] | |
| df[["X","Y","Z"]] = DataFrame(pcares[:-1],columns=["X","Y","Z"]) | |
| results = rank_vectors_cosine(df,queryemb,input_num) | |
| results["ansofquery"] = "yes" | |
| fig = px.scatter_3d(results,x="X",y="Y",z="Z",hover_data={"X": False, "Y": False, "Z": False,"similar":False,"similarity":True,"rank":True},hover_name="id",color="similar",custom_data=["ansofquery"]) | |
| fig.add_trace(go.Scatter3d(x=[qx],y=[qy],z=[qz],mode="markers",marker=dict(size=12,color="green"),name="Query",customdata=["yes"])) | |
| fig2 = px.line_3d(repeat_pattern(results[results["similar"]=="Similar"][["X","Y","Z"]].reset_index(drop=True),{"X":qx,"Y":qy,"Z":qz}),x="X",y="Y",z="Z",hover_data={"X": False, "Y": False, "Z": False}) | |
| return go.Figure(data=fig.data + fig2.data) | |
| app.clientside_callback( | |
| """ | |
| function (clickData) { | |
| if (clickData.points[0].hovertext) { | |
| const match = clickData.points[0].hovertext.match(/^(.*?):(\d+)$/); | |
| if (match) { | |
| data = { | |
| bookName: match[1], | |
| page: parseInt(match[2], 10) | |
| }; | |
| if (window.current_book != data.bookName) { | |
| framewin = document.getElementById("flip-book-iframe").contentWindow | |
| framewin.DFLIP.openURL(window.blobs[data.bookName]) | |
| framewin.DFLIP.activeLightBox.closeButton.hide() | |
| } | |
| window.current_book = data.bookName; | |
| framewin.DFLIP.extrafeatures.gotoPage(data.page); | |
| } | |
| } | |
| } | |
| """, | |
| Input("graph", "clickData"), | |
| prevent_initial_call=True | |
| ) | |
| app.clientside_callback( | |
| """ | |
| function (contentslist, filenameslist) { | |
| window.blobs = {}; | |
| window.current_book = ""; | |
| contentslist.forEach((element,index) => { | |
| const [contentType, base64Data] = element.split(','); | |
| const binaryString = atob(base64Data); | |
| const len = binaryString.length; | |
| const bytes = new Uint8Array(len); | |
| for (let i = 0; i < len; i++) { | |
| bytes[i] = binaryString.charCodeAt(i); | |
| } | |
| const blob = new Blob([bytes], { type: contentType.split(':')[1].split(';')[0] }); | |
| const blobUrl = URL.createObjectURL(blob); | |
| const file = filenameslist[index]; | |
| const filename = file.substring(0, file.lastIndexOf('.')); | |
| window.blobs[filename] = blobUrl; | |
| return; | |
| }); | |
| } | |
| """, | |
| Input("button-upload-id", "contents"), | |
| State("button-upload-id", "filename"), | |
| prevent_initial_call=True, | |
| ) | |
| if __name__ == '__main__': | |
| app.run(host="0.0.0.0",port=7860) | |