Spaces:
Runtime error
Runtime error
| import pdfplumber | |
| import streamlit as st | |
| import requests | |
| import json | |
| import redis | |
| import redis.commands.search | |
| from redis.commands.search.field import TagField, VectorField, TextField | |
| from redis.commands.search.indexDefinition import IndexDefinition, IndexType | |
| import logging | |
| from redis.commands.search.query import Query | |
| import numpy as np | |
| from typing import List, Dict, Any | |
| from utlis.constant import * | |
| from PIL import Image | |
| import google.generativeai as genai | |
| genai.configure(api_key="AIzaSyAhz9UBzkEIYI886zZRm40qqB1Kd_9Y4-0") | |
| import base64 | |
| import sqlite3 | |
| def initialize_session_state(): | |
| if "doc_ortext" not in st.session_state: | |
| st.session_state["doc_ortext"] = None | |
| if "token" not in st.session_state: | |
| st.session_state["token"] ="abcd" | |
| if "service" not in st.session_state: | |
| st.session_state["service"] = None | |
| if "results_str" not in st.session_state: | |
| st.session_state.results_str = False | |
| if "service_slected_to_chat" not in st.session_state: | |
| st.session_state.service_slected_to_chat = False | |
| if "embdding_model" not in st.session_state: | |
| st.session_state["embdding_model"] = None | |
| if "indexing_method" not in st.session_state: | |
| st.session_state["indexing_method"] = None | |
| if "uploaded_files" not in st.session_state: | |
| st.session_state["uploaded_files"] = None | |
| if "messages" not in st.session_state: | |
| st.session_state["messages"] = [{"role": "assistant", "content": "How can I help you?"}] | |
| def extract_text_from_pdf(pdf_path): | |
| text="" | |
| with pdfplumber.open(pdf_path) as pdf: | |
| for page_number, page in enumerate(pdf.pages, start=1): | |
| # Try to extract the text | |
| text+= page.extract_text(x_tolerance=2, y_tolerance=4, layout=True, x_density=5, y_density=10) | |
| return text | |
| def delete_service(token,service_slected_to_delete): | |
| for srevice_name in service_slected_to_delete: | |
| url = REMOVE_SERVICE_API | |
| # JSON payload to be sent | |
| data = { | |
| "token": token, | |
| "servicename": srevice_name | |
| } | |
| json_data = json.dumps(data) | |
| # Set the headers to specify that the content type is JSON | |
| headers = {'Content-Type': 'application/json'} | |
| # Send the POST request | |
| response = requests.delete(url, data=json_data, headers=headers) | |
| if json.loads( response.text).get("success")==True: | |
| st.success(f"{srevice_name} deleted successfully") | |
| else: | |
| st.error(f"{srevice_name} not deleted successfully") | |
| def delete_document(token, service,document_slected_to_delete): | |
| print(document_slected_to_delete) | |
| # for document_name in document_slected_to_delete: | |
| url = REMOVE_DOCUMENTS_API | |
| # JSON payload to be sent | |
| data = { | |
| "token": token, | |
| "service_name": service, | |
| "document_names":document_slected_to_delete | |
| } | |
| # Convert the dictionary to a JSON formatted string | |
| json_data = json.dumps(data) | |
| # Set the headers to specify that the content type is JSON | |
| headers = {'Content-Type': 'application/json'} | |
| # Send the POST request | |
| response = requests.delete(url, data=json_data, headers=headers) | |
| print(response) | |
| if json.loads( response.text).get("status")=="success": | |
| st.success("document(s) deleted successfully") | |
| else: | |
| st.error("document(s) not deleted successfully") | |
| def gemini_vision(file): | |
| load_image = Image.open(file) | |
| prompt= "please extract all text fromt this image" | |
| model = genai.GenerativeModel('gemini-pro-vision') | |
| response = model.generate_content([prompt, load_image]) | |
| return response.text | |
| def add_service(token,servicename): | |
| url = ADD_SERVICES_API | |
| # JSON payload to be sent | |
| data = { | |
| "token": token, | |
| "services": [ | |
| { | |
| "servicename": servicename | |
| } | |
| ] | |
| } | |
| # Convert the dictionary to a JSON formatted string | |
| json_data = json.dumps(data) | |
| # Set the headers to specify that the content type is JSON | |
| headers = {'Content-Type': 'application/json'} | |
| # Send the POST request | |
| response = requests.post(url, data=json_data, headers=headers) | |
| if json.loads( response.text).get("added_services",None): | |
| st.success(f"{servicename} added successfully") | |
| else: | |
| st.error(json.loads( response.text).get("message",None)) | |
| def add_text_document(token, servicename): | |
| # Retrieve text and document name from session state | |
| document_text = st.session_state.text_area | |
| document_name = st.session_state.name_text_area.replace(" ", "_").replace("(", "_").replace(")", "_").replace("-", "_").replace(".", "_") | |
| # Encode the document text as Base64 | |
| encoded_text = base64.b64encode(document_text.encode('utf-8')).decode('utf-8') | |
| url = ADD_STORE_DOCUMENT | |
| # Prepare the JSON payload | |
| data = { | |
| "token": token, | |
| "service_name": servicename, | |
| "document_name": document_name, | |
| "file": encoded_text # Assuming the API can handle Base64 encoded text under the 'file' key | |
| } | |
| # Convert the dictionary to a JSON formatted string and send the POST request | |
| headers = {'Content-Type': 'application/json'} | |
| response = requests.post(url, data=json.dumps(data), headers=headers) | |
| status = json.loads(response.text).get("status") | |
| if status == "success": | |
| st.success(f"{document_name} uploaded successfully as text") | |
| else: | |
| st.error(f"{document_name} not uploaded successfully") | |
| def add_document(token,servicename): | |
| files = st.session_state.uploaded_files | |
| for file in files: | |
| url = ADD_STORE_DOCUMENT | |
| # JSON payload to be sent | |
| document_name = file.name.replace(" ","") | |
| #document_name = document_name.replace(".pdf","") | |
| document_name = document_name.replace("(","_") | |
| document_name = document_name.replace(")","_") | |
| document_name = document_name.replace("-","_") | |
| document_name = document_name.replace(".","_") | |
| encoded_file = base64.b64encode(file.read()).decode('utf-8') | |
| print(encoded_file) | |
| data = { | |
| "token": token, | |
| "service_name": servicename, | |
| "document_name": document_name, | |
| "file":encoded_file | |
| } | |
| # Convert the dictionary to a JSON formatted string | |
| json_data = json.dumps(data) | |
| # Set the headers to specify that the content type is JSON | |
| headers = {'Content-Type': 'application/json'} | |
| # Send the POST request | |
| response = requests.post(url, data=json_data, headers=headers) | |
| document_name = file.name.replace(" ","_") | |
| if json.loads( response.content).get("status")=="success": | |
| st.success(f"{document_name} added successfully") | |
| else: | |
| st.error(f"{document_name} not added successfully") | |
| def search_document(index_name,token,service_name,query, top_k ): | |
| url = SEARCH_API | |
| print(url) | |
| # JSON payload to be sent | |
| data = { | |
| "index_name": index_name, | |
| "token": token, | |
| "service_name": service_name, | |
| "query": query, | |
| "top_k": top_k | |
| } | |
| # Convert the dictionary to a JSON formatted string | |
| json_data = json.dumps(data) | |
| # Set the headers to specify that the content type is JSON | |
| headers = {'Content-Type': 'application/json'} | |
| # Send the POST request | |
| response = requests.post(url, data=json_data, headers=headers) | |
| return response.content | |