Spaces:
Sleeping
Sleeping
| from flask import Flask, request, jsonify,render_template | |
| from flask_cors import CORS | |
| import requests | |
| from sentence_transformers import SentenceTransformer | |
| import faiss | |
| import json | |
| import numpy as np | |
| import os | |
| from flask import Flask, request, jsonify | |
| from flask_cors import CORS | |
| from werkzeug.utils import secure_filename | |
| import fitz # PyMuPDF | |
| import tensorflow as tf | |
| from sentence_transformers import SentenceTransformer | |
| import numpy as np | |
| import faiss | |
| import json | |
| import re | |
| import shutil | |
| app = Flask(__name__) | |
| CORS(app) # Enable CORS for all routes | |
| def index(): | |
| return render_template('index.html') | |
| UPLOAD_FOLDER = 'uploads' | |
| os.makedirs(UPLOAD_FOLDER, exist_ok=True) | |
| app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER | |
| model = SentenceTransformer('all-MiniLM-L6-v2') | |
| index_path = 'vector_index1.faiss' | |
| metadata_path = 'metadata1.json' | |
| embeddings = [] | |
| metadata = [] | |
| def tensor_to_string(tensor): | |
| return tensor.numpy().decode("utf-8") | |
| def extract_text_from_pdf_with_page_numbers(pdf_path): | |
| doc = fitz.open(pdf_path) | |
| text_pages = [] | |
| for page_num in range(len(doc)): | |
| page = doc.load_page(page_num) | |
| text = page.get_text() | |
| text_pages.append((page_num + 1, text)) | |
| return text_pages | |
| def custom_standardization(input_data): | |
| index_pattern = re.compile(r'\.{3,}') | |
| if bool(index_pattern.search(input_data.numpy().decode('utf-8'))): | |
| return "" | |
| stripped_urls = tf.strings.regex_replace(input_data, r"https?://\S+|www\.\S+", "") | |
| stripped_emails = tf.strings.regex_replace(stripped_urls, r"\S+@\S+", "") | |
| stripped_brackets = tf.strings.regex_replace(stripped_emails, r"<.*?>", "") | |
| stripped_square_brackets = tf.strings.regex_replace(stripped_brackets, r"\[|\]", "") | |
| stripped_digits = tf.strings.regex_replace(stripped_square_brackets, r"\w*\d\w*", "") | |
| stripped_non_alpha = tf.strings.regex_replace(stripped_digits, r"[^a-zA-Z\s]", "") | |
| standardized_text = tf.strings.regex_replace(stripped_non_alpha, r"\s+", " ") | |
| return standardized_text.numpy().decode('utf-8') | |
| def split_into_paragraphs(text): | |
| # pattern = r'(?<=\n)(?=\d+)' | |
| paragraphs = re.split(r'(?<=\n)(?=\d+|(?=\n\s*\n))', text) | |
| paragraphs = [paragraph.strip() for paragraph in paragraphs if paragraph.strip()] | |
| return paragraphs | |
| def text_to_vectors(paragraphs): | |
| vectors = model.encode(paragraphs) | |
| return vectors | |
| def split_into_qa(text): | |
| # Define the regex pattern to capture the question and answer in one line | |
| index_pattern = re.compile(r'\.{3,}') | |
| # Split the text at each question mark followed by a newline or space | |
| match = re.search(r'(.*\?.*?)\n', text, re.DOTALL) | |
| # If a match is found, split the text accordingly | |
| if match: | |
| question = match.group(1).strip() # The part before the last question mark | |
| answer = text[match.end():].strip() # The part after the last question mark | |
| # Filter out index-like entries in both question and answer | |
| if index_pattern.search(question): | |
| question = "" # Ignore this as it looks like an index entry | |
| if index_pattern.search(answer): | |
| answer = "" # Ignore this as it looks like an index entry | |
| else: | |
| question = text.strip() # No question mark found, consider the entire text as the question | |
| answer = "" # No answer part | |
| return question, answer | |
| def store_vectors(paragraphs, vectors, metadata, filename, page_num): | |
| for i, (paragraph, vector) in enumerate(zip(paragraphs, vectors)): | |
| original_text = paragraph | |
| question, answer = split_into_qa(original_text) | |
| original_text = paragraph[:500] | |
| standardized_text = custom_standardization(tf.constant(paragraph)) | |
| vector = model.encode(standardized_text).tolist() | |
| metadata.append({ | |
| "index": f'paragraph-{i}', | |
| "filename": filename, | |
| "page_num": page_num, | |
| "standardized_text": standardized_text, | |
| "question_text": question, | |
| "answerable_text": answer | |
| }) | |
| embeddings.append(vector) | |
| def upload_pdf(): | |
| if 'file' not in request.files: | |
| return jsonify({'error': 'No file part'}), 400 | |
| file = request.files['file'] | |
| if file.filename == '': | |
| return jsonify({'error': 'No selected file'}), 400 | |
| if file: | |
| # filename = secure_filename(file.filename) | |
| # file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename) | |
| # file.save(file_path) | |
| filename = secure_filename(file.filename) | |
| # Delete the uploads folder and its contents | |
| if os.path.exists(app.config['UPLOAD_FOLDER']): | |
| shutil.rmtree(app.config['UPLOAD_FOLDER']) | |
| # Recreate the uploads folder | |
| os.makedirs(app.config['UPLOAD_FOLDER']) | |
| file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename) | |
| file.save(file_path) | |
| try: | |
| os.remove('metadata1.json') | |
| os.remove('vector_index1.faiss') | |
| except OSError as e: | |
| print(f"Error: {e.strerror}") | |
| process_pdf(file_path, filename) | |
| print(file_path+filename) | |
| return jsonify({'success': 'File uploaded and processed successfully'}) | |
| def process_pdf(file_path, filename): | |
| text_pages = extract_text_from_pdf_with_page_numbers(file_path) | |
| for page_num, text in text_pages: | |
| paragraphs = split_into_paragraphs(text) | |
| vectors = text_to_vectors(paragraphs) | |
| store_vectors(paragraphs, vectors, metadata, filename, page_num) | |
| save_index_and_metadata() | |
| def save_index_and_metadata(): | |
| embeddings_array = np.array(embeddings, dtype='float32') | |
| dimension = embeddings_array.shape[1] | |
| index = faiss.IndexFlatL2(dimension) | |
| batch_size = 1000 | |
| for i in range(0, len(embeddings), batch_size): | |
| batch_embeddings = embeddings_array[i:i+batch_size] | |
| index.add(batch_embeddings) | |
| faiss.write_index(index, index_path) | |
| with open(metadata_path, 'w') as f: | |
| json.dump(metadata, f) | |
| # Load FAISS index and metadata | |
| def convert_distance_to_similarity(distance): | |
| # Assuming the distances are non-negative, we can use a simple conversion: | |
| return 1 / (1 + distance) * 100 | |
| def query_index(query, model, index, metadata, top_k=5): | |
| query_embedding = model.encode(query).reshape(1, -1).astype('float32') | |
| D, I = index.search(query_embedding, top_k) | |
| results = [] | |
| for i in range(top_k): | |
| doc_metadata = metadata[I[0, i]] | |
| similarity_score = convert_distance_to_similarity(D[0, i]) | |
| result = { | |
| "filename": doc_metadata["filename"], | |
| "page_num": doc_metadata["page_num"], | |
| "standardized_text": doc_metadata["standardized_text"], | |
| "question_text": doc_metadata["question_text"], | |
| "answerable_text": doc_metadata["answerable_text"], | |
| "score": similarity_score | |
| } | |
| results.append(result) | |
| return results | |
| def fetch_answer_from_external_api(question,result): | |
| data = { | |
| "messages": [ | |
| { | |
| "content": "Question=" +question + ",answer to look from Uploaded pdf file and dont include the field name from the json file in answer section = " +str(result) + "answer=Based on your PDF provided , ", | |
| "role": "user" | |
| } | |
| ], | |
| "model": "mixtral:8x7b-instruct-v0.1-q6_K" | |
| } | |
| print("data="+str(data)) | |
| response = requests.post('https://inf.cl.uni-trier.de/chat/', json=data, headers={'accept': 'application/json', 'Content-Type': 'application/json'}) | |
| response_data = response.json() | |
| return response_data.get('response', '') | |
| def create_answer_to_show(query, results): | |
| answer = f"Based on your query '{query}', the following relevant information was found:\n\n" | |
| for result in results: | |
| answer += "\n------------------------------------------------------------------------------------------------------------------\n" | |
| answer += f"Filename: {result['filename']}\n" | |
| answer += f"Page number: {result['page_num']}\n" | |
| answer += f"Related keywords: {result['question_text']}...\n" | |
| if result['answerable_text'] != "": | |
| answer += f"Answer: {result['answerable_text'][:500]}\n" | |
| answer += f"Relevancy Score: {result['score']}\n" | |
| answer += "\nFor more detailed information, please refer to the respective original texts.\n\n\n" | |
| return answer | |
| def query_endpoint(): | |
| data = request.json | |
| query = data.get('query', '') | |
| top_k = data.get('top_k', 5) | |
| index = faiss.read_index(index_path) | |
| with open(metadata_path, 'r') as f: | |
| metadata = json.load(f) | |
| results = query_index(query, model, index, metadata, top_k) | |
| formatted_answer = create_answer_to_show(query, results) | |
| answer2 = fetch_answer_from_external_api(query,results[0]) | |
| print("=>"+answer2) | |
| return jsonify({'answer': answer2+"\n\n"+formatted_answer }) | |
| if __name__ == '__main__': | |
| app.run() | |