|
|
import re, os, json, tempfile, subprocess, nltk |
|
|
|
|
|
|
|
|
from Bio import Entrez |
|
|
from docx import Document |
|
|
import fitz |
|
|
import spacy |
|
|
from spacy.cli import download |
|
|
|
|
|
import core.model |
|
|
import core.pipeline |
|
|
from core.drive_utils import upload_file_to_drive |
|
|
from core.NER.PDF import pdf |
|
|
from core.NER.WordDoc import wordDoc |
|
|
from core.NER.html import extractHTML |
|
|
from core.NER.word2Vec import word2vec |
|
|
|
|
|
import urllib.parse, requests |
|
|
from pathlib import Path |
|
|
import pandas as pd |
|
|
|
|
|
|
|
|
nltk.download('punkt_tab') |
|
|
def download_excel_file(url, save_path="temp.xlsx"): |
|
|
if "view.officeapps.live.com" in url: |
|
|
parsed_url = urllib.parse.parse_qs(urllib.parse.urlparse(url).query) |
|
|
real_url = urllib.parse.unquote(parsed_url["src"][0]) |
|
|
response = requests.get(real_url) |
|
|
with open(save_path, "wb") as f: |
|
|
f.write(response.content) |
|
|
return save_path |
|
|
elif url.startswith("http") and (url.endswith(".xls") or url.endswith(".xlsx")): |
|
|
response = requests.get(url) |
|
|
response.raise_for_status() |
|
|
with open(save_path, "wb") as f: |
|
|
f.write(response.content) |
|
|
print(len(response.content)) |
|
|
return save_path |
|
|
else: |
|
|
print("URL must point directly to an .xls or .xlsx file\n or it already downloaded.") |
|
|
return url |
|
|
def extract_text(link,saveFolder): |
|
|
try: |
|
|
text = "" |
|
|
name = link.split("/")[-1] |
|
|
print("name: ", name) |
|
|
|
|
|
local_temp_path = os.path.join(tempfile.gettempdir(), name) |
|
|
print("this is local temp path: ", local_temp_path) |
|
|
if os.path.exists(local_temp_path): |
|
|
input_to_class = local_temp_path |
|
|
print("exist") |
|
|
else: |
|
|
|
|
|
|
|
|
file_id = pipeline.find_drive_file(name, saveFolder) |
|
|
if file_id: |
|
|
print("π₯ Downloading from Google Drive...") |
|
|
pipeline.download_file_from_drive(name, saveFolder, local_temp_path) |
|
|
else: |
|
|
print("π Downloading from web link...") |
|
|
response = requests.get(link) |
|
|
with open(local_temp_path, 'wb') as f: |
|
|
f.write(response.content) |
|
|
print("β
Saved locally.") |
|
|
|
|
|
|
|
|
pipeline.upload_file_to_drive(local_temp_path, name, saveFolder) |
|
|
|
|
|
input_to_class = local_temp_path |
|
|
print(input_to_class) |
|
|
|
|
|
|
|
|
if link.endswith(".pdf"): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print("inside pdf and input to class: ", input_to_class) |
|
|
print("save folder in extract text: ", saveFolder) |
|
|
p = pdf.PDF(input_to_class, saveFolder) |
|
|
|
|
|
|
|
|
text = p.extractText() |
|
|
print("text from pdf:") |
|
|
print(text) |
|
|
|
|
|
|
|
|
elif link.endswith(".doc") or link.endswith(".docx"): |
|
|
|
|
|
d = wordDoc.wordDoc(input_to_class,saveFolder) |
|
|
text = d.extractTextByPage() |
|
|
|
|
|
else: |
|
|
if link.split(".")[-1].lower() not in "xlsx": |
|
|
if "http" in link or "html" in link: |
|
|
print("html link: ", link) |
|
|
html = extractHTML.HTML("",link) |
|
|
text = html.getListSection() |
|
|
print("text html: ") |
|
|
print(text) |
|
|
|
|
|
if name: |
|
|
if os.path.exists(local_temp_path): |
|
|
os.remove(local_temp_path) |
|
|
print(f"π§Ή Deleted local temp file: {local_temp_path}") |
|
|
print("done extract text") |
|
|
except: |
|
|
text = "" |
|
|
return text |
|
|
|
|
|
def extract_table(link,saveFolder): |
|
|
try: |
|
|
table = [] |
|
|
name = link.split("/")[-1] |
|
|
|
|
|
local_temp_path = os.path.join(tempfile.gettempdir(), name) |
|
|
if os.path.exists(local_temp_path): |
|
|
input_to_class = local_temp_path |
|
|
print("exist") |
|
|
else: |
|
|
|
|
|
|
|
|
file_id = pipeline.find_drive_file(name, saveFolder) |
|
|
if file_id: |
|
|
print("π₯ Downloading from Google Drive...") |
|
|
pipeline.download_file_from_drive(name, saveFolder, local_temp_path) |
|
|
else: |
|
|
print("π Downloading from web link...") |
|
|
response = requests.get(link) |
|
|
with open(local_temp_path, 'wb') as f: |
|
|
f.write(response.content) |
|
|
print("β
Saved locally.") |
|
|
|
|
|
|
|
|
pipeline.upload_file_to_drive(local_temp_path, name, saveFolder) |
|
|
|
|
|
input_to_class = local_temp_path |
|
|
print(input_to_class) |
|
|
|
|
|
|
|
|
if link.endswith(".pdf"): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
p = pdf.PDF(input_to_class,saveFolder) |
|
|
table = p.extractTable() |
|
|
|
|
|
elif link.endswith(".doc") or link.endswith(".docx"): |
|
|
|
|
|
d = wordDoc.wordDoc(input_to_class,saveFolder) |
|
|
table = d.extractTableAsList() |
|
|
|
|
|
elif link.split(".")[-1].lower() in "xlsx": |
|
|
|
|
|
savePath = saveFolder +"/"+ link.split("/")[-1] |
|
|
excelPath = download_excel_file(link, savePath) |
|
|
try: |
|
|
|
|
|
xls = pd.ExcelFile(local_temp_path) |
|
|
table_list = [] |
|
|
for sheet_name in xls.sheet_names: |
|
|
df = pd.read_excel(xls, sheet_name=sheet_name) |
|
|
cleaned_table = df.fillna("").astype(str).values.tolist() |
|
|
table_list.append(cleaned_table) |
|
|
table = table_list |
|
|
except Exception as e: |
|
|
print("β Failed to extract tables from Excel:", e) |
|
|
|
|
|
elif "http" in link or "html" in link: |
|
|
html = extractHTML.HTML("",link) |
|
|
table = html.extractTable() |
|
|
table = clean_tables_format(table) |
|
|
|
|
|
if os.path.exists(local_temp_path): |
|
|
os.remove(local_temp_path) |
|
|
print(f"π§Ή Deleted local temp file: {local_temp_path}") |
|
|
except: |
|
|
table = [] |
|
|
return table |
|
|
|
|
|
def clean_tables_format(tables): |
|
|
""" |
|
|
Ensures all tables are in consistent format: List[List[List[str]]] |
|
|
Cleans by: |
|
|
- Removing empty strings and rows |
|
|
- Converting all cells to strings |
|
|
- Handling DataFrames and list-of-lists |
|
|
""" |
|
|
cleaned = [] |
|
|
if tables: |
|
|
for table in tables: |
|
|
standardized = [] |
|
|
|
|
|
|
|
|
if isinstance(table, pd.DataFrame): |
|
|
table = table.fillna("").astype(str).values.tolist() |
|
|
|
|
|
|
|
|
if isinstance(table, list) and all(isinstance(row, list) for row in table): |
|
|
for row in table: |
|
|
filtered_row = [str(cell).strip() for cell in row if str(cell).strip()] |
|
|
if filtered_row: |
|
|
standardized.append(filtered_row) |
|
|
|
|
|
if standardized: |
|
|
cleaned.append(standardized) |
|
|
|
|
|
return cleaned |
|
|
|
|
|
def normalize_text_for_comparison(s: str) -> str: |
|
|
""" |
|
|
Normalizes text for robust comparison by: |
|
|
1. Converting to lowercase. |
|
|
2. Replacing all types of newlines with a single consistent newline (\n). |
|
|
3. Removing extra spaces (e.g., multiple spaces, leading/trailing spaces on lines). |
|
|
4. Stripping leading/trailing whitespace from the entire string. |
|
|
""" |
|
|
s = s.lower() |
|
|
s = s.replace('\r\n', '\n') |
|
|
s = s.replace('\r', '\n') |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
s = re.sub(r'\s+', ' ', s) |
|
|
|
|
|
return s.strip() |
|
|
def merge_text_and_tables(text, tables, max_tokens=12000, keep_tables=True, tokenizer="cl100k_base", accession_id=None, isolate=None): |
|
|
""" |
|
|
Merge cleaned text and table into one string for LLM input. |
|
|
- Avoids duplicating tables already in text |
|
|
- Extracts only relevant rows from large tables |
|
|
- Skips or saves oversized tables |
|
|
""" |
|
|
import importlib |
|
|
json = importlib.import_module("json") |
|
|
|
|
|
def estimate_tokens(text_str): |
|
|
try: |
|
|
enc = tiktoken.get_encoding(tokenizer) |
|
|
return len(enc.encode(text_str)) |
|
|
except: |
|
|
return len(text_str) // 4 |
|
|
|
|
|
def is_table_relevant(table, keywords, accession_id=None): |
|
|
flat = " ".join(" ".join(row).lower() for row in table) |
|
|
if accession_id and accession_id.lower() in flat: |
|
|
return True |
|
|
return any(kw.lower() in flat for kw in keywords) |
|
|
preview, preview1 = "","" |
|
|
llm_input = "## Document Text\n" + text.strip() + "\n" |
|
|
clean_text = normalize_text_for_comparison(text) |
|
|
|
|
|
if tables: |
|
|
for idx, table in enumerate(tables): |
|
|
keywords = ["province","district","region","village","location", "country", "region", "origin", "ancient", "modern"] |
|
|
if accession_id: keywords += [accession_id.lower()] |
|
|
if isolate: keywords += [isolate.lower()] |
|
|
if is_table_relevant(table, keywords, accession_id): |
|
|
if len(table) > 0: |
|
|
for tab in table: |
|
|
preview = " ".join(tab) if tab else "" |
|
|
preview1 = "\n".join(tab) if tab else "" |
|
|
clean_preview = normalize_text_for_comparison(preview) |
|
|
clean_preview1 = normalize_text_for_comparison(preview1) |
|
|
if clean_preview not in clean_text: |
|
|
if clean_preview1 not in clean_text: |
|
|
table_str = json.dumps([tab], indent=2) |
|
|
llm_input += f"## Table {idx+1}\n{table_str}\n" |
|
|
return llm_input.strip() |
|
|
|
|
|
def preprocess_document(link, saveFolder, accession=None, isolate=None): |
|
|
try: |
|
|
text = extract_text(link, saveFolder) |
|
|
print("text and link") |
|
|
print(link) |
|
|
print(text) |
|
|
except: text = "" |
|
|
try: |
|
|
tables = extract_table(link, saveFolder) |
|
|
except: tables = [] |
|
|
if accession: accession = accession |
|
|
if isolate: isolate = isolate |
|
|
try: |
|
|
final_input = merge_text_and_tables(text, tables, max_tokens=12000, accession_id=accession, isolate=isolate) |
|
|
except: final_input = "" |
|
|
return text, tables, final_input |
|
|
|
|
|
def extract_sentences(text): |
|
|
sentences = re.split(r'(?<=[.!?])\s+', text) |
|
|
return [s.strip() for s in sentences if s.strip()] |
|
|
|
|
|
def is_irrelevant_number_sequence(text): |
|
|
if re.search(r'\b[A-Z]{2,}\d+\b|\b[A-Za-z]+\s+\d+\b', text, re.IGNORECASE): |
|
|
return False |
|
|
word_count = len(re.findall(r'\b[A-Za-z]{2,}\b', text)) |
|
|
number_count = len(re.findall(r'\b\d[\d\.]*\b', text)) |
|
|
total_tokens = len(re.findall(r'\S+', text)) |
|
|
if total_tokens > 0 and (word_count / total_tokens < 0.2) and (number_count / total_tokens > 0.5): |
|
|
return True |
|
|
elif re.fullmatch(r'(\d+(\.\d+)?\s*)+', text.strip()): |
|
|
return True |
|
|
return False |
|
|
|
|
|
def remove_isolated_single_digits(sentence): |
|
|
tokens = sentence.split() |
|
|
filtered_tokens = [] |
|
|
for token in tokens: |
|
|
if token == '0' or token == '1': |
|
|
pass |
|
|
else: |
|
|
filtered_tokens.append(token) |
|
|
return ' '.join(filtered_tokens).strip() |
|
|
|
|
|
def get_contextual_sentences_BFS(text_content, keyword, depth=2): |
|
|
def extract_codes(sentence): |
|
|
|
|
|
return [code for code in re.findall(r'\b[A-Z]{2,}[0-9]+\b', sentence, re.IGNORECASE)] |
|
|
sentences = extract_sentences(text_content) |
|
|
relevant_sentences = set() |
|
|
initial_keywords = set() |
|
|
|
|
|
|
|
|
|
|
|
code_pattern = re.compile(r'([A-Z0-9]+?)(\d+)$', re.IGNORECASE) |
|
|
|
|
|
|
|
|
keyword_match = code_pattern.search(keyword) |
|
|
|
|
|
keyword_prefix = None |
|
|
keyword_num = None |
|
|
|
|
|
if keyword_match: |
|
|
keyword_prefix = keyword_match.group(1).lower() |
|
|
keyword_num = int(keyword_match.group(2)) |
|
|
|
|
|
for sentence in sentences: |
|
|
sentence_added = False |
|
|
|
|
|
|
|
|
if re.search(r'\b' + re.escape(keyword) + r'\b', sentence, re.IGNORECASE): |
|
|
relevant_sentences.add(sentence.strip()) |
|
|
initial_keywords.add(keyword.lower()) |
|
|
sentence_added = True |
|
|
|
|
|
|
|
|
|
|
|
range_matches = re.finditer(r'([A-Z0-9]+-\d+)', sentence, re.IGNORECASE) |
|
|
range_matches = re.finditer(r'([A-Z0-9]+\d+)-([A-Z0-9]+\d+)', sentence, re.IGNORECASE) |
|
|
|
|
|
for r_match in range_matches: |
|
|
start_code_str = r_match.group(1) |
|
|
end_code_str = r_match.group(2) |
|
|
|
|
|
|
|
|
start_match = code_pattern.search(start_code_str) |
|
|
end_match = code_pattern.search(end_code_str) |
|
|
|
|
|
if keyword_prefix and keyword_num is not None and start_match and end_match: |
|
|
start_prefix = start_match.group(1).lower() |
|
|
end_prefix = end_match.group(1).lower() |
|
|
start_num = int(start_match.group(2)) |
|
|
end_num = int(end_match.group(2)) |
|
|
|
|
|
|
|
|
if keyword_prefix == start_prefix and \ |
|
|
keyword_prefix == end_prefix and \ |
|
|
start_num <= keyword_num <= end_num: |
|
|
relevant_sentences.add(sentence.strip()) |
|
|
initial_keywords.add(start_code_str.lower()) |
|
|
initial_keywords.add(end_code_str.lower()) |
|
|
sentence_added = True |
|
|
break |
|
|
|
|
|
|
|
|
|
|
|
if sentence_added: |
|
|
for word in extract_codes(sentence): |
|
|
initial_keywords.add(word.lower()) |
|
|
|
|
|
|
|
|
|
|
|
word_to_sentences = {} |
|
|
for sent in sentences: |
|
|
codes_in_sent = set(extract_codes(sent)) |
|
|
for code in codes_in_sent: |
|
|
word_to_sentences.setdefault(code.lower(), set()).add(sent.strip()) |
|
|
|
|
|
|
|
|
|
|
|
graph = {} |
|
|
for sent in sentences: |
|
|
codes = set(extract_codes(sent)) |
|
|
for word1 in codes: |
|
|
word1_lower = word1.lower() |
|
|
graph.setdefault(word1_lower, set()) |
|
|
for word2 in codes: |
|
|
word2_lower = word2.lower() |
|
|
if word1_lower != word2_lower: |
|
|
graph[word1_lower].add(word2_lower) |
|
|
|
|
|
|
|
|
|
|
|
queue = [(k, 0) for k in initial_keywords if k in word_to_sentences] |
|
|
visited_words = set(initial_keywords) |
|
|
|
|
|
while queue: |
|
|
current_word, level = queue.pop(0) |
|
|
if level >= depth: |
|
|
continue |
|
|
|
|
|
relevant_sentences.update(word_to_sentences.get(current_word, [])) |
|
|
|
|
|
for neighbor in graph.get(current_word, []): |
|
|
if neighbor not in visited_words: |
|
|
visited_words.add(neighbor) |
|
|
queue.append((neighbor, level + 1)) |
|
|
|
|
|
final_sentences = set() |
|
|
for sentence in relevant_sentences: |
|
|
if not is_irrelevant_number_sequence(sentence): |
|
|
processed_sentence = remove_isolated_single_digits(sentence) |
|
|
if processed_sentence: |
|
|
final_sentences.add(processed_sentence) |
|
|
|
|
|
return "\n".join(sorted(list(final_sentences))) |
|
|
|
|
|
|
|
|
|
|
|
def get_contextual_sentences_DFS(text_content, keyword, depth=2): |
|
|
sentences = extract_sentences(text_content) |
|
|
|
|
|
|
|
|
word_to_sentences = {} |
|
|
for sent in sentences: |
|
|
words_in_sent = set(re.findall(r'\b[A-Za-z0-9\-_\/]+\b', sent)) |
|
|
for word in words_in_sent: |
|
|
word_to_sentences.setdefault(word.lower(), set()).add(sent.strip()) |
|
|
|
|
|
|
|
|
def extract_codes(sentence): |
|
|
|
|
|
return [code for code in re.findall(r'\b[A-Z]{2,}[0-9]+\b', sentence, re.IGNORECASE)] |
|
|
|
|
|
|
|
|
def dfs_traverse(current_word, current_depth, max_depth, visited_words, collected_sentences, parent_sentence=None): |
|
|
country = "unknown" |
|
|
if current_depth > max_depth: |
|
|
return country, False |
|
|
|
|
|
if current_word not in word_to_sentences: |
|
|
return country, False |
|
|
|
|
|
for sentence in word_to_sentences[current_word]: |
|
|
if sentence == parent_sentence: |
|
|
continue |
|
|
|
|
|
collected_sentences.add(sentence) |
|
|
|
|
|
|
|
|
small_sen = extract_context(sentence, current_word, int(len(sentence) / 4)) |
|
|
|
|
|
country = model.get_country_from_text(small_sen) |
|
|
|
|
|
if country.lower() != "unknown": |
|
|
return country, True |
|
|
else: |
|
|
country = model.get_country_from_text(sentence) |
|
|
|
|
|
if country.lower() != "unknown": |
|
|
return country, True |
|
|
|
|
|
codes_in_sentence = extract_codes(sentence) |
|
|
idx = next((i for i, code in enumerate(codes_in_sentence) if code.lower() == current_word.lower()), None) |
|
|
if idx is None: |
|
|
continue |
|
|
|
|
|
sorted_children = sorted( |
|
|
[code for code in codes_in_sentence if code.lower() not in visited_words], |
|
|
key=lambda x: (abs(codes_in_sentence.index(x) - idx), |
|
|
0 if codes_in_sentence.index(x) > idx else 1) |
|
|
) |
|
|
|
|
|
|
|
|
for child in sorted_children: |
|
|
child_lower = child.lower() |
|
|
if child_lower not in visited_words: |
|
|
visited_words.add(child_lower) |
|
|
country, should_stop = dfs_traverse( |
|
|
child_lower, current_depth + 1, max_depth, |
|
|
visited_words, collected_sentences, parent_sentence=sentence |
|
|
) |
|
|
if should_stop: |
|
|
return country, True |
|
|
|
|
|
return country, False |
|
|
|
|
|
|
|
|
collected_sentences = set() |
|
|
visited_words = set([keyword.lower()]) |
|
|
country, status = dfs_traverse(keyword.lower(), 0, depth, visited_words, collected_sentences) |
|
|
|
|
|
|
|
|
final_sentences = set() |
|
|
for sentence in collected_sentences: |
|
|
if not is_irrelevant_number_sequence(sentence): |
|
|
processed = remove_isolated_single_digits(sentence) |
|
|
if processed: |
|
|
final_sentences.add(processed) |
|
|
if not final_sentences: |
|
|
return country, text_content |
|
|
return country, "\n".join(sorted(list(final_sentences))) |
|
|
|
|
|
|
|
|
def normalize_for_overlap(s: str) -> str: |
|
|
s = re.sub(r'[^a-zA-Z0-9\s]', ' ', s).lower() |
|
|
s = re.sub(r'\s+', ' ', s).strip() |
|
|
return s |
|
|
|
|
|
def merge_texts_skipping_overlap(text1: str, text2: str) -> str: |
|
|
if not text1: return text2 |
|
|
if not text2: return text1 |
|
|
|
|
|
|
|
|
if text2 in text1: |
|
|
return text1 |
|
|
if text1 in text2: |
|
|
return text2 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
max_junction_overlap = 0 |
|
|
for i in range(min(len(text1), len(text2)), 0, -1): |
|
|
suffix1 = text1[-i:] |
|
|
prefix2 = text2[:i] |
|
|
|
|
|
if suffix1 == prefix2: |
|
|
max_junction_overlap = i |
|
|
break |
|
|
elif normalize_for_overlap(suffix1) == normalize_for_overlap(prefix2): |
|
|
max_junction_overlap = i |
|
|
break |
|
|
|
|
|
if max_junction_overlap > 0: |
|
|
merged_text = text1 + text2[max_junction_overlap:] |
|
|
return re.sub(r'\s+', ' ', merged_text).strip() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
longest_common_prefix_len = 0 |
|
|
min_len = min(len(text1), len(text2)) |
|
|
for i in range(min_len): |
|
|
if text1[i] == text2[i]: |
|
|
longest_common_prefix_len = i + 1 |
|
|
else: |
|
|
break |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if longest_common_prefix_len > 0 and \ |
|
|
text1[longest_common_prefix_len:].strip() and \ |
|
|
text2[longest_common_prefix_len:].strip(): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
common_prefix_str = text1[:longest_common_prefix_len] |
|
|
remainder_text1 = text1[longest_common_prefix_len:] |
|
|
remainder_text2 = text2[longest_common_prefix_len:] |
|
|
|
|
|
merged_text = common_prefix_str + remainder_text1 + remainder_text2 |
|
|
return re.sub(r'\s+', ' ', merged_text).strip() |
|
|
|
|
|
|
|
|
|
|
|
merged_text = text1 + text2 |
|
|
return re.sub(r'\s+', ' ', merged_text).strip() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def save_text_to_docx(text_content: str, full_local_path: str): |
|
|
document = Document() |
|
|
for paragraph_text in text_content.split('\n'): |
|
|
document.add_paragraph(paragraph_text) |
|
|
document.save(full_local_path) |
|
|
print(f"β
Saved DOCX locally: {full_local_path}") |
|
|
|
|
|
|
|
|
|
|
|
'''2 scenerios: |
|
|
- quick look then found then deepdive and directly get location then stop |
|
|
- quick look then found then deepdive but not find location then hold the related words then |
|
|
look another files iteratively for each related word and find location and stop''' |
|
|
def extract_context(text, keyword, window=500): |
|
|
|
|
|
code_pattern = re.compile(r'([A-Z0-9]+?)(\d+)$', re.IGNORECASE) |
|
|
|
|
|
|
|
|
keyword_match = code_pattern.search(keyword) |
|
|
|
|
|
keyword_prefix = None |
|
|
keyword_num = None |
|
|
|
|
|
if keyword_match: |
|
|
keyword_prefix = keyword_match.group(1).lower() |
|
|
keyword_num = int(keyword_match.group(2)) |
|
|
text = text.lower() |
|
|
idx = text.find(keyword.lower()) |
|
|
if idx == -1: |
|
|
if keyword_prefix: |
|
|
idx = text.find(keyword_prefix) |
|
|
if idx == -1: |
|
|
return "Sample ID not found." |
|
|
return text[max(0, idx-window): idx+window] |
|
|
return text[max(0, idx-window): idx+window] |
|
|
def process_inputToken(filePaths, saveLinkFolder,accession=None, isolate=None): |
|
|
cache = {} |
|
|
country = "unknown" |
|
|
output = "" |
|
|
tem_output, small_output = "","" |
|
|
keyword_appear = (False,"") |
|
|
keywords = [] |
|
|
if isolate: keywords.append(isolate) |
|
|
if accession: keywords.append(accession) |
|
|
for f in filePaths: |
|
|
|
|
|
if keywords: |
|
|
for keyword in keywords: |
|
|
text, tables, final_input = preprocess_document(f,saveLinkFolder, isolate=keyword) |
|
|
if keyword in final_input: |
|
|
context = extract_context(final_input, keyword) |
|
|
|
|
|
country = model.get_country_from_text(context) |
|
|
if country != "unknown": |
|
|
return country, context, final_input |
|
|
else: |
|
|
country = model.get_country_from_text(final_input) |
|
|
if country != "unknown": |
|
|
return country, context, final_input |
|
|
else: |
|
|
keyword_appear = (True, f) |
|
|
cache[f] = context |
|
|
small_output = merge_texts_skipping_overlap(output, context) + "\n" |
|
|
chunkBFS = get_contextual_sentences_BFS(small_output, keyword) |
|
|
countryBFS = model.get_country_from_text(chunkBFS) |
|
|
countryDFS, chunkDFS = get_contextual_sentences_DFS(output, keyword) |
|
|
output = merge_texts_skipping_overlap(output, final_input) |
|
|
if countryDFS != "unknown" and countryBFS != "unknown": |
|
|
if len(chunkDFS) <= len(chunkBFS): |
|
|
return countryDFS, chunkDFS, output |
|
|
else: |
|
|
return countryBFS, chunkBFS, output |
|
|
else: |
|
|
if countryDFS != "unknown": |
|
|
return countryDFS, chunkDFS, output |
|
|
if countryBFS != "unknown": |
|
|
return countryBFS, chunkBFS, output |
|
|
else: |
|
|
|
|
|
'''cross-ref: ex: A1YU101 keyword in file 2 which includes KM1 but KM1 in file 1 |
|
|
but if we look at file 1 first then maybe we can have lookup dict which country |
|
|
such as Thailand as the key and its re''' |
|
|
cache[f] = final_input |
|
|
if keyword_appear[0] == True: |
|
|
for c in cache: |
|
|
if c!=keyword_appear[1]: |
|
|
if cache[c].lower() not in output.lower(): |
|
|
output = merge_texts_skipping_overlap(output, cache[c]) + "\n" |
|
|
chunkBFS = get_contextual_sentences_BFS(output, keyword) |
|
|
countryBFS = model.get_country_from_text(chunkBFS) |
|
|
countryDFS, chunkDFS = get_contextual_sentences_DFS(output, keyword) |
|
|
if countryDFS != "unknown" and countryBFS != "unknown": |
|
|
if len(chunkDFS) <= len(chunkBFS): |
|
|
return countryDFS, chunkDFS, output |
|
|
else: |
|
|
return countryBFS, chunkBFS, output |
|
|
else: |
|
|
if countryDFS != "unknown": |
|
|
return countryDFS, chunkDFS, output |
|
|
if countryBFS != "unknown": |
|
|
return countryBFS, chunkBFS, output |
|
|
else: |
|
|
if cache[f].lower() not in output.lower(): |
|
|
output = merge_texts_skipping_overlap(output, cache[f]) + "\n" |
|
|
if len(output) == 0 or keyword_appear[0]==False: |
|
|
for c in cache: |
|
|
if cache[c].lower() not in output.lower(): |
|
|
output = merge_texts_skipping_overlap(output, cache[c]) + "\n" |
|
|
return country, "", output |