Spaces:
Sleeping
Sleeping
| import os | |
| import io | |
| import time | |
| import json | |
| import glob | |
| import shutil | |
| import textwrap | |
| import pdf2image | |
| import pytesseract | |
| import nltk | |
| import openai | |
| import pandas as pd | |
| from langchain.chat_models import ChatOpenAI | |
| from langchain.prompts import ChatPromptTemplate | |
| from langchain.chains import create_extraction_chain | |
| import re | |
| from Bio import Entrez | |
| from tqdm.auto import tqdm | |
| import streamlit as st | |
| from ast import literal_eval | |
| nltk.download('punkt') | |
| os.environ['OPENAI_API_KEY'] = "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" | |
| Entrez.email = "firqaaa@gmail.com" | |
| Entrez.api_key = "3d7a71231fe7f2d2bd7599e022535199a908" | |
| fold = -1 | |
| buffer = io.BytesIO() | |
| # chunk_size = 8000 | |
| st.cache_data() | |
| def convert_df(df): | |
| return df.to_csv().encode('utf-8') | |
| def replace_quotes(text): | |
| pattern = r'(?<=")[^"]*(?=")' | |
| return re.sub(pattern, lambda match: match.group(0).replace('"', "'"), text) | |
| def clean_text(text): | |
| """Remove section titles and figure descriptions from text""" | |
| pattern = r'[^\w\s]' | |
| clean = "\n".join([row for row in text.split("\n") if (len(row.split(" "))) > 3 and not (row.startswith("(a)")) and not row.startswith("Figure")]) | |
| return re.sub(pattern, '', clean) | |
| def truncate_text(text, max_tokens): | |
| wrapper = textwrap.TextWrapper(width=max_tokens) | |
| truncated_text = wrapper.wrap(text) | |
| if len(truncated_text) > 0: | |
| return truncated_text[0] | |
| else: | |
| return "" | |
| def split_text(text, chunk_size): | |
| chunks = [] | |
| start = 0 | |
| end = chunk_size | |
| while start < len(text): | |
| chunks.append(text[start:end]) | |
| start = end | |
| end += chunk_size | |
| return chunks | |
| def extract_gene_name(text): | |
| text_str = text.decode("utf-8") | |
| text_str = text_str.replace("\\n", "").replace("\\t", "").replace("\\'", "'") | |
| pattern = r"<NAME>(.*?)</NAME>" | |
| match = re.search(pattern, text_str) | |
| if match: | |
| gene_name = match.group(1) | |
| return gene_name | |
| else: | |
| return None | |
| def get_geneName(rsid): | |
| text = Entrez.efetch(db="snp", id=rsid, retmode='xml').read() | |
| text = extract_gene_name(text) | |
| return text | |
| def split_text_into_sentences(text, num_sentences): | |
| sentences = nltk.sent_tokenize(text) | |
| grouped_sentences = [sentences[i:i+num_sentences] for i in range(0, len(sentences), num_sentences)] | |
| return grouped_sentences | |
| def flatten_list(nested_list): | |
| flattened_list = [] | |
| for item in nested_list: | |
| if isinstance(item, list): | |
| flattened_list.extend(flatten_list(item)) | |
| else: | |
| flattened_list.append(item) | |
| return flattened_list | |
| def move_file(source_path, destination_path): | |
| # Make sure the destination folder exists before moving the file | |
| if not os.path.exists(destination_path): | |
| os.makedirs(destination_path) | |
| try: | |
| shutil.move(source_path, destination_path) | |
| print(f"File moved successfully from '{source_path}' to '{destination_path}'.") | |
| except Exception as e: | |
| print(f"Error: {e}") | |
| llm = ChatOpenAI(temperature=0, model="gpt-3.5-turbo-16k-0613") | |
| schema = { | |
| "properties" : { | |
| "title" : {"type" : "string"}, | |
| "author" : {"type" : "string"}, | |
| "publisher" : {"type" : "string"}, | |
| "publication_year" : {"type" : "string"}, | |
| "gene_codes" : {"type" : "string"}, | |
| "population_race" : {"type" : "string"}, | |
| "phenotypes_or_diseases" : {"type" : "string"}, | |
| "sample_size" : {"type" : "string"}, | |
| "SNPs" : {"type" : "string"}, | |
| "Study_Methodology" : {"type" : "string"}, | |
| "Study_Level" : {"type" : "string"}, | |
| "Outcome/Recommendation/Conclusion" : {"type" : "string"} | |
| }, | |
| "required" : ["title"] | |
| } | |
| chain = create_extraction_chain(schema, llm) | |
| err_path = [] | |
| # Page title | |
| st.set_page_config(page_title="PubMed Paper Extraction") | |
| st.title("PubMed Paper Extraction") | |
| uploaded_file = st.file_uploader('Upload Paper Here : ', type="pdf") | |
| if uploaded_file: | |
| st.write(f"{uploaded_file.name} successfully uploaded") | |
| chunk_size = st.selectbox( | |
| 'Tokens amounts per process :', | |
| (16000, 12000, 10000, 8000, 5000) | |
| ) | |
| parseButton = st.button("Extract Text") | |
| if parseButton: | |
| with st.spinner(text='Extraction in progress ...'): | |
| try: | |
| images = pdf2image.convert_from_bytes(uploaded_file.getvalue()) | |
| extracted_text = "" | |
| for image in images[:-1]: | |
| text = pytesseract.image_to_string(image) | |
| text = clean_text(text) | |
| extracted_text += text + " " | |
| text = replace_quotes(extracted_text) | |
| text_chunk = split_text(text, chunk_size)[:fold] | |
| chunkdf = [] | |
| for i, chunk in enumerate(text_chunk): | |
| inp = chunk | |
| df = pd.DataFrame(literal_eval(str(json.dumps(chain.run(inp)[0])).replace("\'", "\"")), index=[0]).fillna('') | |
| chunkdf.append(df) | |
| concat = pd.concat(chunkdf, axis=0).reset_index().drop('index', axis=1).fillna('') | |
| concat['title'] = concat['title'][0] | |
| concat['author'] = concat['author'][0] | |
| concat['publisher'] = concat['publisher'][0] | |
| concat['publication_year'] = concat['publication_year'][0] | |
| # concat = concat.min().to_frame().T | |
| concat['SNPs'] = concat['SNPs'].apply(lambda x: x if x.startswith('rs') else '') | |
| for col in list(concat.columns): | |
| concat[col] = concat[col].apply(lambda x: x if x not in ['N/A', 'not mentioned', 'Not mentioned', 'Unknown'] else '') | |
| L = [] | |
| for i in range(len(concat)): | |
| if (len(concat['gene_codes'][i].split(',')) >= 1) and concat['SNPs'][i] == '': | |
| for g in concat['gene_codes'][i].split(','): | |
| L.append({ | |
| 'Title' : concat['title'][0], | |
| 'Author' : concat['author'][0], | |
| 'Publisher' : concat['publisher'][0], | |
| 'Publication Year' : concat['publication_year'][0], | |
| 'Genes' : g.upper(), | |
| 'Population' : concat['population_race'][i], | |
| 'Phenotype' : concat['phenotypes_or_diseases'][i].title(), | |
| 'Sample Size' : concat['sample_size'][i], | |
| 'SNPs' : concat['SNPs'][i], | |
| 'Study Methodology' : concat['Study_Methodology'][i].title(), | |
| 'Study Level' : concat['Study_Level'][i].title(), | |
| 'Outcomes' : concat['Outcome/Recommendation/Conclusion'][i].capitalize() | |
| }) | |
| elif (len(concat['SNPs'][i].split(',')) >= 1): | |
| for s in concat['SNPs'][i].split(','): | |
| try: | |
| L.append({ | |
| 'Title' : concat['title'][0], | |
| 'Author' : concat['author'][0], | |
| 'Publisher' : concat['publisher'][0], | |
| 'Publication Year' : concat['publication_year'][0], | |
| 'Genes' : get_geneName(s.strip()).upper(), | |
| 'Population' : concat['population_race'][0], | |
| 'Phenotype' : concat['phenotypes_or_diseases'][i].title(), | |
| 'Sample Size' : concat['sample_size'][i], | |
| 'SNPs' : s, | |
| 'Study Methodology' : concat['Study_Methodology'][i], | |
| 'Study Level' : concat['Study_Level'][i].title(), | |
| 'Outcomes' : concat['Outcome/Recommendation/Conclusion'][i].capitalize() | |
| }) | |
| except Exception as e: | |
| L.append({ | |
| 'Title' : concat['title'][0], | |
| 'Author' : concat['author'][0], | |
| 'Publisher' : concat['publisher'][0], | |
| 'Publication Year' : concat['publication_year'][0], | |
| 'Genes' : '', | |
| 'Population' : concat['population_race'][0], | |
| 'Phenotype' : concat['phenotypes_or_diseases'][i].title(), | |
| 'Sample Size' : concat['sample_size'][i], | |
| 'SNPs' : s, | |
| 'Study Methodology' : concat['Study_Methodology'][i], | |
| 'Study Level' : concat['Study_Level'][i].title(), | |
| 'Outcomes' : concat['Outcome/Recommendation/Conclusion'][i].capitalize() | |
| }) | |
| # # result = pd.DataFrame(L) | |
| st.dataframe(pd.DataFrame(L)) | |
| # csv = convert_df(pd.DataFrame(L)) | |
| with pd.ExcelWriter(buffer, engine='xlsxwriter') as writer: | |
| # Write each dataframe to a different worksheet | |
| pd.DataFrame(L).to_excel(writer, sheet_name='Result') | |
| writer.close() | |
| st.download_button( | |
| label="Save Result", | |
| data=buffer, | |
| file_name=str(uploaded_file.name).replace('.pdf', '') + '.xlsx', | |
| mime='application/vnd.ms-excel' | |
| ) | |
| except json.JSONDecodeError as j: | |
| st.write("Sorry, we are experiencing difficulties in extracting the information. Please try again with different context length.") | |
| except Exception as e: | |
| st.write("Sorry, we are experiencing difficulties in extracting the information. Please ensure that you input an uncorrupted file.") |