Spaces:
Running
Running
Update pipeline.py
Browse files- pipeline.py +38 -3
pipeline.py
CHANGED
|
@@ -245,10 +245,14 @@ def unique_preserve_order(seq):
|
|
| 245 |
seen = set()
|
| 246 |
return [x for x in seq if not (x in seen or seen.add(x))]
|
| 247 |
# Main execution
|
| 248 |
-
def pipeline_with_gemini(accessions,niche_cases=None):
|
| 249 |
# output: country, sample_type, ethnic, location, money_cost, time_cost, explain
|
| 250 |
# there can be one accession number in the accessions
|
| 251 |
# Prices are per 1,000 tokens
|
|
|
|
|
|
|
|
|
|
|
|
|
| 252 |
PRICE_PER_1K_INPUT_LLM = 0.000075 # $0.075 per 1M tokens
|
| 253 |
PRICE_PER_1K_OUTPUT_LLM = 0.0003 # $0.30 per 1M tokens
|
| 254 |
PRICE_PER_1K_EMBEDDING_INPUT = 0.000025 # $0.025 per 1M tokens
|
|
@@ -330,6 +334,9 @@ def pipeline_with_gemini(accessions,niche_cases=None):
|
|
| 330 |
file_all_path = os.path.join(LOCAL_TEMP_DIR, all_filename)
|
| 331 |
# file_chunk_path = os.path.join(tempfile.gettempdir(), chunk_filename)
|
| 332 |
# file_all_path = os.path.join(tempfile.gettempdir(), all_filename)
|
|
|
|
|
|
|
|
|
|
| 333 |
print(file_chunk_path)
|
| 334 |
chunk_id = find_drive_file(chunk_filename, sample_folder_id)
|
| 335 |
all_id = find_drive_file(all_filename, sample_folder_id)
|
|
@@ -386,6 +393,9 @@ def pipeline_with_gemini(accessions,niche_cases=None):
|
|
| 386 |
accession, isolate = None, None
|
| 387 |
if acc != "unknown": accession = acc
|
| 388 |
if iso != "unknown": isolate = iso
|
|
|
|
|
|
|
|
|
|
| 389 |
# check doi first
|
| 390 |
if doi != "unknown":
|
| 391 |
link = 'https://doi.org/' + doi
|
|
@@ -413,6 +423,9 @@ def pipeline_with_gemini(accessions,niche_cases=None):
|
|
| 413 |
# filter the quality link
|
| 414 |
print("saveLinkFolder as sample folder id: ", sample_folder_id)
|
| 415 |
print("start the smart filter link")
|
|
|
|
|
|
|
|
|
|
| 416 |
# success_process, output_process = run_with_timeout(smart_fallback.filter_links_by_metadata,args=(tem_links,sample_folder_id),kwargs={"accession":acc})
|
| 417 |
# if success_process:
|
| 418 |
# links = output_process
|
|
@@ -439,6 +452,9 @@ def pipeline_with_gemini(accessions,niche_cases=None):
|
|
| 439 |
# if not all_output:
|
| 440 |
# text_all, table_all, document_title_all = model.read_docx_text(file_all_path)
|
| 441 |
# all_output = data_preprocess.normalize_for_overlap(text_all) + "\n" + data_preprocess.normalize_for_overlap(". ".join(table_all))
|
|
|
|
|
|
|
|
|
|
| 442 |
if chunk_exists:
|
| 443 |
print("File chunk exists!")
|
| 444 |
if not chunk:
|
|
@@ -466,6 +482,9 @@ def pipeline_with_gemini(accessions,niche_cases=None):
|
|
| 466 |
else: query_kw = acc
|
| 467 |
#text_link, tables_link, final_input_link = data_preprocess.preprocess_document(link,saveLinkFolder, isolate=query_kw)
|
| 468 |
success_process, output_process = run_with_timeout(data_preprocess.preprocess_document,args=(link,sample_folder_id),kwargs={"isolate":query_kw,"accession":acc},timeout=100)
|
|
|
|
|
|
|
|
|
|
| 469 |
if success_process:
|
| 470 |
text_link, tables_link, final_input_link = output_process[0], output_process[1], output_process[2]
|
| 471 |
print("yes succeed for process document")
|
|
@@ -474,6 +493,9 @@ def pipeline_with_gemini(accessions,niche_cases=None):
|
|
| 474 |
if context != "Sample ID not found.":
|
| 475 |
if len(data_preprocess.normalize_for_overlap(chunk)) < 1000*1000:
|
| 476 |
success_chunk, the_output_chunk = run_with_timeout(data_preprocess.merge_texts_skipping_overlap,args=(chunk, context))
|
|
|
|
|
|
|
|
|
|
| 477 |
if success_chunk:
|
| 478 |
chunk = the_output_chunk#data_preprocess.merge_texts_skipping_overlap(all_output, final_input_link)
|
| 479 |
print("yes succeed for chunk")
|
|
@@ -492,6 +514,9 @@ def pipeline_with_gemini(accessions,niche_cases=None):
|
|
| 492 |
if len(data_preprocess.normalize_for_overlap(all_output)) < int(100000) and len(final_input_link)<100000:
|
| 493 |
print("Running merge_texts_skipping_overlap with timeout")
|
| 494 |
success, the_output = run_with_timeout(data_preprocess.merge_texts_skipping_overlap,args=(all_output, final_input_link),timeout=30)
|
|
|
|
|
|
|
|
|
|
| 495 |
print("Returned from timeout logic")
|
| 496 |
if success:
|
| 497 |
all_output = the_output#data_preprocess.merge_texts_skipping_overlap(all_output, final_input_link)
|
|
@@ -511,7 +536,9 @@ def pipeline_with_gemini(accessions,niche_cases=None):
|
|
| 511 |
print("basic fall back")
|
| 512 |
print("len all output after: ", len(all_output))
|
| 513 |
#country_pro, chunk, all_output = data_preprocess.process_inputToken(links, saveLinkFolder, accession=accession, isolate=isolate)
|
| 514 |
-
|
|
|
|
|
|
|
| 515 |
else:
|
| 516 |
chunk = "Collection_date: " + col_date +". Isolate: " + iso + ". Title: " + title + ". Features: " + features
|
| 517 |
all_output = "Collection_date: " + col_date +". Isolate: " + iso + ". Title: " + title + ". Features: " + features
|
|
@@ -621,7 +648,9 @@ def pipeline_with_gemini(accessions,niche_cases=None):
|
|
| 621 |
print("\nRAG assets loaded from file. No re-embedding of entire document will occur.")
|
| 622 |
plain_text_content_all, table_strings_all, document_title_all = model.read_docx_text(file_all_path)
|
| 623 |
master_structured_lookup['document_title'] = master_structured_lookup.get('document_title', document_title_all)
|
| 624 |
-
|
|
|
|
|
|
|
| 625 |
primary_word = iso
|
| 626 |
alternative_word = acc
|
| 627 |
print(f"\n--- General Query: Primary='{primary_word}' (Alternative='{alternative_word}') ---")
|
|
@@ -634,6 +663,9 @@ def pipeline_with_gemini(accessions,niche_cases=None):
|
|
| 634 |
print(chunk)
|
| 635 |
print("this is all output for the model")
|
| 636 |
print(all_output)
|
|
|
|
|
|
|
|
|
|
| 637 |
country, sample_type, method_used, country_explanation, sample_type_explanation, total_query_cost = model.query_document_info(
|
| 638 |
primary_word, alternative_word, meta, master_structured_lookup, faiss_index, document_chunks,
|
| 639 |
model.call_llm_api, chunk=chunk, all_output=all_output)
|
|
@@ -687,6 +719,9 @@ def pipeline_with_gemini(accessions,niche_cases=None):
|
|
| 687 |
if len(method_used + sample_type_explanation)> 0:
|
| 688 |
acc_score["sample_type"][sample_type.lower()] = [method_used + sample_type_explanation]
|
| 689 |
total_cost_title += total_query_cost
|
|
|
|
|
|
|
|
|
|
| 690 |
# last resort: combine all information to give all output otherwise unknown
|
| 691 |
if len(acc_score["country"]) == 0 or len(acc_score["sample_type"]) == 0 or acc_score["country"] == "unknown" or acc_score["sample_type"] == "unknown":
|
| 692 |
text = ""
|
|
|
|
| 245 |
seen = set()
|
| 246 |
return [x for x in seq if not (x in seen or seen.add(x))]
|
| 247 |
# Main execution
|
| 248 |
+
def pipeline_with_gemini(accessions,stop_flag=None, niche_cases=None):
|
| 249 |
# output: country, sample_type, ethnic, location, money_cost, time_cost, explain
|
| 250 |
# there can be one accession number in the accessions
|
| 251 |
# Prices are per 1,000 tokens
|
| 252 |
+
# Before each big step:
|
| 253 |
+
if stop_flag is not None and stop_flag.value:
|
| 254 |
+
print(f"π Stop detected before starting {accession}, aborting early...")
|
| 255 |
+
return {}
|
| 256 |
PRICE_PER_1K_INPUT_LLM = 0.000075 # $0.075 per 1M tokens
|
| 257 |
PRICE_PER_1K_OUTPUT_LLM = 0.0003 # $0.30 per 1M tokens
|
| 258 |
PRICE_PER_1K_EMBEDDING_INPUT = 0.000025 # $0.025 per 1M tokens
|
|
|
|
| 334 |
file_all_path = os.path.join(LOCAL_TEMP_DIR, all_filename)
|
| 335 |
# file_chunk_path = os.path.join(tempfile.gettempdir(), chunk_filename)
|
| 336 |
# file_all_path = os.path.join(tempfile.gettempdir(), all_filename)
|
| 337 |
+
if stop_flag is not None and stop_flag.value:
|
| 338 |
+
print(f"π Stop processing {accession}, aborting early...")
|
| 339 |
+
return {}
|
| 340 |
print(file_chunk_path)
|
| 341 |
chunk_id = find_drive_file(chunk_filename, sample_folder_id)
|
| 342 |
all_id = find_drive_file(all_filename, sample_folder_id)
|
|
|
|
| 393 |
accession, isolate = None, None
|
| 394 |
if acc != "unknown": accession = acc
|
| 395 |
if iso != "unknown": isolate = iso
|
| 396 |
+
if stop_flag is not None and stop_flag.value:
|
| 397 |
+
print(f"π Stop processing {accession}, aborting early...")
|
| 398 |
+
return {}
|
| 399 |
# check doi first
|
| 400 |
if doi != "unknown":
|
| 401 |
link = 'https://doi.org/' + doi
|
|
|
|
| 423 |
# filter the quality link
|
| 424 |
print("saveLinkFolder as sample folder id: ", sample_folder_id)
|
| 425 |
print("start the smart filter link")
|
| 426 |
+
if stop_flag is not None and stop_flag.value:
|
| 427 |
+
print(f"π Stop processing {accession}, aborting early...")
|
| 428 |
+
return {}
|
| 429 |
# success_process, output_process = run_with_timeout(smart_fallback.filter_links_by_metadata,args=(tem_links,sample_folder_id),kwargs={"accession":acc})
|
| 430 |
# if success_process:
|
| 431 |
# links = output_process
|
|
|
|
| 452 |
# if not all_output:
|
| 453 |
# text_all, table_all, document_title_all = model.read_docx_text(file_all_path)
|
| 454 |
# all_output = data_preprocess.normalize_for_overlap(text_all) + "\n" + data_preprocess.normalize_for_overlap(". ".join(table_all))
|
| 455 |
+
if stop_flag is not None and stop_flag.value:
|
| 456 |
+
print(f"π Stop processing {accession}, aborting early...")
|
| 457 |
+
return {}
|
| 458 |
if chunk_exists:
|
| 459 |
print("File chunk exists!")
|
| 460 |
if not chunk:
|
|
|
|
| 482 |
else: query_kw = acc
|
| 483 |
#text_link, tables_link, final_input_link = data_preprocess.preprocess_document(link,saveLinkFolder, isolate=query_kw)
|
| 484 |
success_process, output_process = run_with_timeout(data_preprocess.preprocess_document,args=(link,sample_folder_id),kwargs={"isolate":query_kw,"accession":acc},timeout=100)
|
| 485 |
+
if stop_flag is not None and stop_flag.value:
|
| 486 |
+
print(f"π Stop processing {accession}, aborting early...")
|
| 487 |
+
return {}
|
| 488 |
if success_process:
|
| 489 |
text_link, tables_link, final_input_link = output_process[0], output_process[1], output_process[2]
|
| 490 |
print("yes succeed for process document")
|
|
|
|
| 493 |
if context != "Sample ID not found.":
|
| 494 |
if len(data_preprocess.normalize_for_overlap(chunk)) < 1000*1000:
|
| 495 |
success_chunk, the_output_chunk = run_with_timeout(data_preprocess.merge_texts_skipping_overlap,args=(chunk, context))
|
| 496 |
+
if stop_flag is not None and stop_flag.value:
|
| 497 |
+
print(f"π Stop processing {accession}, aborting early...")
|
| 498 |
+
return {}
|
| 499 |
if success_chunk:
|
| 500 |
chunk = the_output_chunk#data_preprocess.merge_texts_skipping_overlap(all_output, final_input_link)
|
| 501 |
print("yes succeed for chunk")
|
|
|
|
| 514 |
if len(data_preprocess.normalize_for_overlap(all_output)) < int(100000) and len(final_input_link)<100000:
|
| 515 |
print("Running merge_texts_skipping_overlap with timeout")
|
| 516 |
success, the_output = run_with_timeout(data_preprocess.merge_texts_skipping_overlap,args=(all_output, final_input_link),timeout=30)
|
| 517 |
+
if stop_flag is not None and stop_flag.value:
|
| 518 |
+
print(f"π Stop processing {accession}, aborting early...")
|
| 519 |
+
return {}
|
| 520 |
print("Returned from timeout logic")
|
| 521 |
if success:
|
| 522 |
all_output = the_output#data_preprocess.merge_texts_skipping_overlap(all_output, final_input_link)
|
|
|
|
| 536 |
print("basic fall back")
|
| 537 |
print("len all output after: ", len(all_output))
|
| 538 |
#country_pro, chunk, all_output = data_preprocess.process_inputToken(links, saveLinkFolder, accession=accession, isolate=isolate)
|
| 539 |
+
if stop_flag is not None and stop_flag.value:
|
| 540 |
+
print(f"π Stop processing {accession}, aborting early...")
|
| 541 |
+
return {}
|
| 542 |
else:
|
| 543 |
chunk = "Collection_date: " + col_date +". Isolate: " + iso + ". Title: " + title + ". Features: " + features
|
| 544 |
all_output = "Collection_date: " + col_date +". Isolate: " + iso + ". Title: " + title + ". Features: " + features
|
|
|
|
| 648 |
print("\nRAG assets loaded from file. No re-embedding of entire document will occur.")
|
| 649 |
plain_text_content_all, table_strings_all, document_title_all = model.read_docx_text(file_all_path)
|
| 650 |
master_structured_lookup['document_title'] = master_structured_lookup.get('document_title', document_title_all)
|
| 651 |
+
if stop_flag is not None and stop_flag.value:
|
| 652 |
+
print(f"π Stop processing {accession}, aborting early...")
|
| 653 |
+
return {}
|
| 654 |
primary_word = iso
|
| 655 |
alternative_word = acc
|
| 656 |
print(f"\n--- General Query: Primary='{primary_word}' (Alternative='{alternative_word}') ---")
|
|
|
|
| 663 |
print(chunk)
|
| 664 |
print("this is all output for the model")
|
| 665 |
print(all_output)
|
| 666 |
+
if stop_flag is not None and stop_flag.value:
|
| 667 |
+
print(f"π Stop processing {accession}, aborting early...")
|
| 668 |
+
return {}
|
| 669 |
country, sample_type, method_used, country_explanation, sample_type_explanation, total_query_cost = model.query_document_info(
|
| 670 |
primary_word, alternative_word, meta, master_structured_lookup, faiss_index, document_chunks,
|
| 671 |
model.call_llm_api, chunk=chunk, all_output=all_output)
|
|
|
|
| 719 |
if len(method_used + sample_type_explanation)> 0:
|
| 720 |
acc_score["sample_type"][sample_type.lower()] = [method_used + sample_type_explanation]
|
| 721 |
total_cost_title += total_query_cost
|
| 722 |
+
if stop_flag is not None and stop_flag.value:
|
| 723 |
+
print(f"π Stop processing {accession}, aborting early...")
|
| 724 |
+
return {}
|
| 725 |
# last resort: combine all information to give all output otherwise unknown
|
| 726 |
if len(acc_score["country"]) == 0 or len(acc_score["sample_type"]) == 0 or acc_score["country"] == "unknown" or acc_score["sample_type"] == "unknown":
|
| 727 |
text = ""
|