Spaces:
Sleeping
Sleeping
| import pdfplumber | |
| import streamlit as st | |
| import requests | |
| import json | |
| import redis | |
| import redis.commands.search | |
| from redis.commands.search.field import TagField, VectorField, TextField | |
| from redis.commands.search.indexDefinition import IndexDefinition, IndexType | |
| import logging | |
| from redis.commands.search.query import Query | |
| import numpy as np | |
| from typing import List, Dict, Any | |
| from semantic_text_splitter import TextSplitter | |
| from tokenizers import Tokenizer | |
| from sentence_transformers import SentenceTransformer | |
| from utlis.constant import * | |
| from PIL import Image | |
| import google.generativeai as genai | |
| genai.configure(api_key="AIzaSyAhz9UBzkEIYI886zZRm40qqB1Kd_9Y4-0") | |
| def initialize_session_state(): | |
| if "token" not in st.session_state: | |
| st.session_state["token"] ="abcd" | |
| if "service" not in st.session_state: | |
| st.session_state["service"] = None | |
| if "use_document" not in st.session_state: | |
| st.session_state.use_document = False | |
| if "flag" not in st.session_state: | |
| st.session_state.flag = False | |
| if "embdding_model" not in st.session_state: | |
| st.session_state["embdding_model"] = None | |
| if "indexing_method" not in st.session_state: | |
| st.session_state["indexing_method"] = None | |
| if "uploaded_files" not in st.session_state: | |
| st.session_state["uploaded_files"] = None | |
| if "messages" not in st.session_state: | |
| st.session_state["messages"] = [{"role": "assistant", "content": "How can I help you?"}] | |
| def extract_text_from_pdf(pdf_path): | |
| text="" | |
| with pdfplumber.open(pdf_path) as pdf: | |
| for page_number, page in enumerate(pdf.pages, start=1): | |
| # Try to extract the text | |
| text+= page.extract_text(x_tolerance=2, y_tolerance=4, layout=True, x_density=5, y_density=10) | |
| return text | |
| def delete_service(token,service_slected_to_delete): | |
| for srevice_name in service_slected_to_delete: | |
| url = REMOVE_SERVICE_API | |
| # JSON payload to be sent | |
| data = { | |
| "token": token, | |
| "servicename": srevice_name | |
| } | |
| json_data = json.dumps(data) | |
| # Set the headers to specify that the content type is JSON | |
| headers = {'Content-Type': 'application/json'} | |
| # Send the POST request | |
| response = requests.post(url, data=json_data, headers=headers) | |
| if json.loads( response.text).get("success")==True: | |
| st.success(f"{srevice_name} deleted successfully") | |
| else: | |
| st.error(f"{srevice_name} not deleted successfully") | |
| def delete_document(token, service,document_slected_to_delete): | |
| for document_name in document_slected_to_delete: | |
| url = REMOVE_DOCUMENT_API | |
| # JSON payload to be sent | |
| data = { | |
| "token": token, | |
| "servicename": service, | |
| "documentname":document_name} | |
| # Convert the dictionary to a JSON formatted string | |
| json_data = json.dumps(data) | |
| # Set the headers to specify that the content type is JSON | |
| headers = {'Content-Type': 'application/json'} | |
| # Send the POST request | |
| response = requests.post(url, data=json_data, headers=headers) | |
| if json.loads( response.text).get("status")=="success": | |
| st.success(f"{document_name} deleted successfully") | |
| else: | |
| st.error(f"{document_name} not deleted successfully") | |
| def gemini_vision(file): | |
| load_image = Image.open(file) | |
| prompt= "please extract all text fromt this image" | |
| model = genai.GenerativeModel('gemini-pro-vision') | |
| response = model.generate_content([prompt, load_image]) | |
| return response.text | |
| def add_service(token,servicename,embdding_model): | |
| url = ADD_SERVICES_API | |
| # JSON payload to be sent | |
| data = { | |
| "token": token, | |
| "services": [ | |
| { | |
| "servicename": servicename, | |
| "modelname": embdding_model | |
| } | |
| ] | |
| } | |
| # Convert the dictionary to a JSON formatted string | |
| json_data = json.dumps(data) | |
| # Set the headers to specify that the content type is JSON | |
| headers = {'Content-Type': 'application/json'} | |
| # Send the POST request | |
| response = requests.post(url, data=json_data, headers=headers) | |
| if json.loads( response.text).get("added_services"): | |
| st.success(f"{servicename} added successfully") | |
| else: | |
| st.error(response.text) | |
| def add_document(token,servicename): | |
| for file in st.session_state.uploaded_files: | |
| if file.type.split('/')[-1]=='pdf': | |
| text= extract_text_from_pdf(file) | |
| else: | |
| text = gemini_vision(file) | |
| print(text) | |
| if text: | |
| url = CHUNK_STORE_API | |
| # JSON payload to be sent | |
| document_name = file.name.replace(" ","") | |
| #document_name = document_name.replace(".pdf","") | |
| document_name = document_name.replace("(","_") | |
| document_name = document_name.replace(")","_") | |
| document_name = document_name.replace("-","_") | |
| data = { | |
| "text": text, | |
| "document_name":document_name, | |
| "user_id": token, | |
| "service_name": servicename | |
| } | |
| # Convert the dictionary to a JSON formatted string | |
| json_data = json.dumps(data) | |
| # Set the headers to specify that the content type is JSON | |
| headers = {'Content-Type': 'application/json'} | |
| # Send the POST request | |
| response = requests.post(url, data=json_data, headers=headers) | |
| document_name = file.name.replace(" ","_") | |
| if json.loads( response.text).get("success")==True: | |
| st.success(f"{document_name} uploaded successfully") | |
| else: | |
| st.error(f"{document_name} not uploaded successfully") | |
| else: | |
| st.error("we can't extract text from {}".format(file.name)) | |
| def get_context(prompt,token,service_name,top_k): | |
| url = SEARCH_API | |
| # JSON payload to be sent | |
| data = { | |
| "userid": token, | |
| "service_name": service_name, | |
| "query_str": prompt, | |
| "document_names":st.session_state.doument_slected_to_chat , | |
| "top_k": top_k | |
| } | |
| # Convert the dictionary to a JSON formatted string | |
| json_data = json.dumps(data) | |
| # Set the headers to specify that the content type is JSON | |
| headers = {'Content-Type': 'application/json'} | |
| # Send the POST request | |
| response = requests.post(url, data=json_data, headers=headers) | |
| if json.loads( response.text).get("results"): | |
| context = [] | |
| for chunk in json.loads( response.text).get("results"): | |
| context.append(chunk['chunk']) | |
| return context | |
| else: | |
| return [] | |
| def query(payload): | |
| response = requests.post(API_URL, headers=HEADERS, json=payload) | |
| return response.json() | |
| def generate_response(llm_name, question, context = None): | |
| url = CHAT_API | |
| #st.chat_message("assistant", avatar="🤖").write(context) | |
| # JSON payload to be sent | |
| data = { | |
| "context": context, | |
| "question": question, | |
| "model_name": llm_name, | |
| } | |
| # Convert the dictionary to a JSON formatted string | |
| json_data = json.dumps(data) | |
| # Set the headers to specify that the content type is JSON | |
| headers = {'Content-Type': 'application/json'} | |
| # Send the POST request | |
| response = requests.post(url, data=json_data, headers=headers) | |
| return json.loads( response.text).get("response", "429 Quota exceeded for quota metric.") | |